Skip to content

Commit

Permalink
[FLINK-31215] Backpropagate processing rate limits from non-scalable …
Browse files Browse the repository at this point in the history
…bottlenecks to upstream operators
  • Loading branch information
aplyusnin committed Jul 3, 2024
1 parent f2adb15 commit d25a4fc
Show file tree
Hide file tree
Showing 7 changed files with 1,045 additions and 264 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.autoscaler;

import org.apache.flink.runtime.jobgraph.JobVertexID;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** Class for storing intermediate scaling results. */
public class IntermediateScalingResult {

private final Map<JobVertexID, ScalingSummary> scalingSummaries;
private final List<JobVertexID> bottlenecks;

private double backpropagationScaleFactor = 1.0;

public IntermediateScalingResult() {
scalingSummaries = new HashMap<>();
bottlenecks = new ArrayList<>();
}

void addScalingSummary(JobVertexID vertex, ScalingSummary scalingSummary) {
scalingSummaries.put(vertex, scalingSummary);
}

void addBottleneckVertex(JobVertexID bottleneck, double factor) {
bottlenecks.add(bottleneck);
backpropagationScaleFactor = Math.min(backpropagationScaleFactor, factor);
}

public List<JobVertexID> getBottlenecks() {
return bottlenecks;
}

public double getBackpropagationScaleFactor() {
return backpropagationScaleFactor;
}

public Map<JobVertexID, ScalingSummary> getScalingSummaries() {
return scalingSummaries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,31 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context> autoScalerEventHandl
this.autoScalerEventHandler = autoScalerEventHandler;
}

public int computeScaleTargetParallelism(
public VertexScalingResult computeScaleTargetParallelism(
Context context,
JobVertexID vertex,
Collection<ShipStrategy> inputShipStrategies,
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
SortedMap<Instant, ScalingSummary> history,
Duration restartTime) {
Duration restartTime,
double backpropagationScaleFactor) {
var conf = context.getConfiguration();

boolean excluded =
conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS).contains(vertex.toHexString());
if (excluded) {
LOG.debug(
"Vertex {} is part of `vertex.exclude.ids` config, Check for bottleneck but not scale",
vertex);
}

var currentParallelism = (int) evaluatedMetrics.get(PARALLELISM).getCurrent();
double averageTrueProcessingRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
if (Double.isNaN(averageTrueProcessingRate)) {
LOG.warn(
"True processing rate is not available for {}, cannot compute new parallelism",
vertex);
return currentParallelism;
return VertexScalingResult.normalScaling(currentParallelism);
}

double targetCapacity =
Expand All @@ -95,9 +105,11 @@ public int computeScaleTargetParallelism(
LOG.warn(
"Target data rate is not available for {}, cannot compute new parallelism",
vertex);
return currentParallelism;
return VertexScalingResult.normalScaling(currentParallelism);
}

targetCapacity *= backpropagationScaleFactor;

LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity);
double scaleFactor = targetCapacity / averageTrueProcessingRate;
double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
Expand All @@ -122,32 +134,44 @@ public int computeScaleTargetParallelism(
double cappedTargetCapacity = averageTrueProcessingRate * scaleFactor;
LOG.debug("Capped target processing capacity for {} is {}", vertex, cappedTargetCapacity);

int newParallelism =
int parallelismLowerLimit =
excluded
? currentParallelism
: Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM));
int parallelismUpperLimit =
excluded
? currentParallelism
: Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM));

var scalingResult =
scale(
currentParallelism,
inputShipStrategies,
(int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
scaleFactor,
Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)),
Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)));
parallelismLowerLimit,
parallelismUpperLimit);

if (newParallelism == currentParallelism
if (scalingResult.getParallelism() == currentParallelism
|| blockScalingBasedOnPastActions(
context,
vertex,
conf,
evaluatedMetrics,
history,
currentParallelism,
newParallelism)) {
return currentParallelism;
scalingResult.getParallelism())) {
return new VertexScalingResult(
currentParallelism,
scalingResult.getBottleneckScaleFactor(),
scalingResult.isBottleneck());
}

// We record our expectations for this scaling operation
evaluatedMetrics.put(
ScalingMetric.EXPECTED_PROCESSING_RATE,
EvaluatedScalingMetric.of(cappedTargetCapacity));
return newParallelism;
return scalingResult;
}

private boolean blockScalingBasedOnPastActions(
Expand Down Expand Up @@ -249,9 +273,12 @@ private boolean detectIneffectiveScaleUp(
* <p>Also, in order to ensure the data is evenly spread across subtasks, we try to adjust the
* parallelism for source and keyed vertex such that it divides the maxParallelism without a
* remainder.
*
* <p>If newParallelism exceeds min(parallelismUpperLimit, maxParallelism) the job vertex
* considered to be a bottleneck.
*/
@VisibleForTesting
protected static int scale(
protected static VertexScalingResult scale(
int currentParallelism,
Collection<ShipStrategy> inputShipStrategies,
int maxParallelism,
Expand Down Expand Up @@ -284,26 +311,36 @@ protected static int scale(
// parallelism upper limit
final int upperBound = Math.min(maxParallelism, parallelismUpperLimit);

boolean isBottleneck = false;
double bottleneckScaleFactor = 1.0;

// If required parallelism is higher than upper bound ---> the vertex is a bottleneck
if (newParallelism > upperBound) {
isBottleneck = true;
bottleneckScaleFactor = (double) upperBound / newParallelism;
newParallelism = upperBound;
}

// Apply min/max parallelism
newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound);
newParallelism = Math.max(parallelismLowerLimit, newParallelism);

var adjustByMaxParallelism =
inputShipStrategies.isEmpty() || inputShipStrategies.contains(HASH);
if (!adjustByMaxParallelism) {
return newParallelism;
return new VertexScalingResult(newParallelism, bottleneckScaleFactor, isBottleneck);
}

// When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to
// adjust the parallelism such that it divides the maxParallelism without a remainder
// => data is evenly spread across subtasks
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) {
if (maxParallelism % p == 0) {
return p;
return new VertexScalingResult(p, bottleneckScaleFactor, isBottleneck);
}
}

// If parallelism adjustment fails, use originally computed parallelism
return newParallelism;
return new VertexScalingResult(newParallelism, bottleneckScaleFactor, isBottleneck);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.SortedMap;

import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_EXECUTION_DISABLED_REASON;
Expand Down Expand Up @@ -220,39 +221,77 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
return Map.of();
}

var out = new HashMap<JobVertexID, ScalingSummary>();
var excludeVertexIdList =
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
var scalingResult =
computeScalingSummaryInternal(
context, evaluatedMetrics, scalingHistory, restartTime, jobTopology, 1.0);

if (scalingResult.getBottlenecks().isEmpty()
|| !context.getConfiguration()
.getBoolean(PROCESSING_RATE_BACKPROPAGATION_ENABLED)) {
return scalingResult.getScalingSummaries();
}

LOG.info("Vertices with ids {} are bottlenecks", scalingResult.getBottlenecks());

double backpropagationScaleFactor = scalingResult.getBackpropagationScaleFactor();

LOG.info(
"Processing rate back propagation scaling factor is {}",
backpropagationScaleFactor);

scalingResult =
computeScalingSummaryInternal(
context,
evaluatedMetrics,
scalingHistory,
restartTime,
jobTopology,
backpropagationScaleFactor);

return scalingResult.getScalingSummaries();
}

IntermediateScalingResult computeScalingSummaryInternal(
Context context,
EvaluatedMetrics evaluatedMetrics,
Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory,
Duration restartTime,
JobTopology jobTopology,
double backpropagationScaleFactor) {

var scalingResult = new IntermediateScalingResult();
evaluatedMetrics
.getVertexMetrics()
.forEach(
(v, metrics) -> {
if (excludeVertexIdList.contains(v.toHexString())) {
LOG.debug(
"Vertex {} is part of `vertex.exclude.ids` config, Ignoring it for scaling",
v);
} else {
var currentParallelism =
(int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();

var newParallelism =
jobVertexScaler.computeScaleTargetParallelism(
context,
v,
jobTopology.get(v).getInputs().values(),
metrics,
scalingHistory.getOrDefault(
v, Collections.emptySortedMap()),
restartTime);
if (currentParallelism != newParallelism) {
out.put(
var currentParallelism =
(int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();

var newParallelism =
jobVertexScaler.computeScaleTargetParallelism(
context,
v,
new ScalingSummary(
currentParallelism, newParallelism, metrics));
}
jobTopology.get(v).getInputs().values(),
metrics,
scalingHistory.getOrDefault(
v, Collections.emptySortedMap()),
restartTime,
backpropagationScaleFactor);
if (currentParallelism != newParallelism.getParallelism()) {
scalingResult.addScalingSummary(
v,
new ScalingSummary(
currentParallelism,
newParallelism.getParallelism(),
metrics));
}
// Even if parallelism didn't change, vertex can be a bottleneck
if (newParallelism.isBottleneck()) {
scalingResult.addBottleneckVertex(
v, newParallelism.getBottleneckScaleFactor());
}
});
return out;
return scalingResult;
}

private boolean isJobUnderMemoryPressure(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.autoscaler;

import lombok.AllArgsConstructor;
import lombok.Getter;

/** Class for storing information on how a single vertex is scaled. */
@AllArgsConstructor
@Getter
public class VertexScalingResult {
private int parallelism;
private double bottleneckScaleFactor;
private boolean isBottleneck;

public static VertexScalingResult normalScaling(int parallelism) {
return new VertexScalingResult(parallelism, 1.0, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.withDescription(
"Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.");

public static final ConfigOption<Boolean> PROCESSING_RATE_BACKPROPAGATION_ENABLED =
autoScalerConfig("processing.rate.backpropagation.enabled")
.booleanType()
.defaultValue(false)
.withFallbackKeys(
oldOperatorConfigKey("processing.rate.backpropagation.enabled"))
.withDescription(
"Enable backpropagation of processing rate during autoscaling to reduce resources usage.");

public static final ConfigOption<Duration> METRICS_WINDOW =
autoScalerConfig("metrics.window")
.durationType()
Expand Down Expand Up @@ -313,7 +322,7 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.defaultValues()
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
.withDescription(
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling.");

public static final ConfigOption<Duration> SCALING_EVENT_INTERVAL =
autoScalerConfig("scaling.event.interval")
Expand Down
Loading

0 comments on commit d25a4fc

Please sign in to comment.