Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31215] [autoscaler] Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators #847

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

aplyusnin
Copy link

What is the purpose of the change

This pull request adds logic for backpropagating processing rate from non-scalable bottlenecks to upstream operators, potentially reducing parallelism of bakcpressured vertices after scaling.

Brief change log

  • Introduce an option for enabling back propagation checks during autoscaling
  • Update scaling functions to determine potential bottlenecks
  • Scaling of target capacity for each vertex by some coefficient
  • This coefficient is evaluated in the way jobs' bottlenecks are scaled as much as possible, but not exceed max parallelism.

Verifying this change

This change added tests and can be verified as follows:

  • Extended existing tests in JobVertexScalerTest to check updated logic for vertex exclusion and effects of backpropagations scale factor
  • Extended ScalingExecutorTest by tests for testing Backpropagation on different jobs and vertices exclusion.
  • Manually verified on different jobs with different max parallelism configuration causing bottlenecks appearance and with different sets of excluded vertices.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changes to the CustomResourceDescriptors: no
  • Core observer or reconciler logic that is regularly executed: no

Documentation

@aplyusnin
Copy link
Author

Hi, @gyfora, could you review the code and run the workflows, please?

@@ -58,6 +58,15 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.withDescription(
"Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.");

public static final ConfigOption<Boolean> PROCESSING_RATE_BACKPROPAGATION_ENABLED =
autoScalerConfig("processing.rate.backpropagation.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could call this simply bottleneck-propagation.enabled and to control the scaling bottleneck-propagation.allow-scale-down

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 2 config names are not in sync now

@mxm
Copy link
Contributor

mxm commented Jul 5, 2024

Thanks for the PR @aplyusnin! I'll take a look.


void addBottleneckVertex(JobVertexID bottleneck, double factor) {
bottlenecks.add(bottleneck);
backpropagationScaleFactor = Math.min(backpropagationScaleFactor, factor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the scaling factor not kept on a per-vertex level? If there are two vertices within different branches, they will influence each other, e.g. a propgagation factor of 0.1 will override another with 0.9. I think we need to account for it per input, similarly to how we propagate the actual rates.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scale factor works as follows: the minimum possible value is picked and pushed to sources, lowering target capacity by this factor on each vertex. This approach works fine for most of the jobs:

  1. Starting from a bottleneck vertex, the capacity of all upstream vertices of the bottleneck is reduced by the factor
  2. During propagation, agation source operators are reached and their capacity is reduced
  3. It affects vertices that may not be directly connected with the initial bottleneck
  4. Repeating steps 2 and 3 will adjust all vertices in the connected components

I think case with 2 and more connected components (e.g. graph source1 -> op1 -> sink1; source2 -> op2 -> sink2) appears rarely.

As an alternative, bottlenecks can be iterated in bottleneck factor decreasing order, making propagation more accurate, but it takes more time for scaling (O(N^2) against O(N)) and is harder to maintain and is less predictable.

What do you think? Should we use more complex logic for propagation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might also have to consider the output ratios when propagating the bottleneck backwards .

So technically speaking if we want to be completely precise we can do this in a single pass if we start computing the target rates from the sinks. Once the actual scaled rate is computed we have to propagate the diff compared to the original one back based on the output ratio to the upstream tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Gyula has a point. The current implementation works for simple jobs but there are many jobs with more complex uncorrelated branches which would lead to unnecessary scale downs or prevent upscales (if scale down is disabled). Using the output ratios would allow us to precisely feed back the bottleneck ratios and avoid any accidental backpropagation.

@aplyusnin
Copy link
Author

Hi, @mxm, @gyfora. I rewrote the code for the processing rate backpropagation. Unfortunately, backpropagation results depend on the vertices order during backpropagation, and the default Flink's topological order may not be the best. Also, I decided to update target data rate metrics during backpropagation processing to make the code more compact. Is it ok?

@gyfora
Copy link
Contributor

gyfora commented Jul 11, 2024

Hi, @mxm, @gyfora. I rewrote the code for the processing rate backpropagation. Unfortunately, backpropagation results depend on the vertices order during backpropagation, and the default Flink's topological order may not be the best. Also, I decided to update target data rate metrics during backpropagation processing to make the code more compact. Is it ok?

I think the backpropagation has to go in reverse topological order (ie from sinks to sources) and then it should be stable. Am I missing something @aplyusnin ?

@aplyusnin
Copy link
Author

Yes, @gyfora, you are right. Now, backpropagation considers vertices in reverse topological order. 

@aplyusnin
Copy link
Author

Hi, @mxm @gyfora! I've finished the implementation. Could you review the code and run workflows?

Copy link
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started to review the code but it's very hard for me to understand the actual logic just by looking at it.

I am a bit puzzled by the concept of back propagation factors, scale factors etc. per vertex and things like that.

In my head I am looking for a much simpler logic such as:

actualTargetProcessingRate = min(targetProcessingRate, max(downstream_target_rate / output_ratio))

Basically for each vertex we check that it has any downstream vertex with a target capacity that would backpressure it and then adjust the target rate.

I don't see why we need factors / multipliers etc

@@ -58,6 +58,15 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.withDescription(
"Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.");

public static final ConfigOption<Boolean> PROCESSING_RATE_BACKPROPAGATION_ENABLED =
autoScalerConfig("processing.rate.backpropagation.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 2 config names are not in sync now

@aplyusnin
Copy link
Author

Thank you for your reply.

Now, the backpropagation logic for a single vertex is the following:

  1. Adjust target data rate by factor from downstream (processingRateCapacity and currentBackPropFactor)
  2. Update backpropagation factor if required parallelism (target data rate divided by true processing rate) exceeds max parallelism of the vertex
  3. Evaluate the data rate comming from the direct upstream
  4. Backpropagate factor to direct upstream

For example, take a look at operator 3.

pr

Initially, it's target data rate was 250, and it is lowered by upstream by 0.8 and become 200.
In order to process the whole data rate, the new parallelism should be 200 / 50 * 10 = 40 (target data rate / processing rate * parallelism).

This value is 2 times bigger than max parallelism (40 / 20 = 2), so the backpressure factor to propagate is 0.8 (from upstream) * 20 / 40 (the vertex is a bottleneck) = 0.4.

pr2

Now it's time to propagate the factor to the direct upstream (operator 1 and operator 2). Note that operator 1 is already adjusted by some other vertices.

At first, the data rate from the direct upstream is evaluated (target data rate * output rate * backpressure factor): 100 * 2 * 0.5 = 100 from operator 1 and 50 * 1 * 1 = 50 from operator 2, summing up to 150.

Since the adjusted target data rate of operator 3 is 100 and the upstream provides 150, all direct upstream operators should be lowered. To do it, their backpressure factor should be multiplied by 100 / 150 = 2/3 (target data rate / data rate from the upstream).

pr3
Hope this example helps.

This process repeats for all vertices in reverse topological order. Then, the target data rate is updated using scale factors propagated to sources.

There are also some extra checks to prevent aggressive scaling down.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update to the PR @aplyusnin! Could you explain why the logic you described is required over the simpler logic Gyula outlined? It does not seem necessary to propagate the load factors.

In my eyes, it would be sufficient to do a single path in revese-topologically order (i.e. from sinks to sources), where we limit upstream vertices by the rate limit established downstream via the initial scaling logic. This could be done in a recursive fashion.

It doesn't matter to the upstream vertices what the downstream backpropagation factos are, because the rates dictate how much the vertice will be scaled. The backprogagtion factor is only relevant locally to the vertex to apply a limit to its rate.

Comment on lines +166 to +171
double averageTrueProcessingRate =
evaluatedMetrics
.getVertexMetrics()
.get(vertex)
.get(TRUE_PROCESSING_RATE)
.getAverage();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this metric required? Isn't the TARGET_DATA_RATE the relevant metric sufficinet to apply the backpropagation factor?

@aplyusnin
Copy link
Author

Thank you for your replies!

I don't understand how we can determine if a vertex is a bottleneck without evaluating its parallelism. This is why TRUE_PROCESSING_RATE is used.

Also, I think that using the simpler approach is not accurate enough. Suppose we have a window join operator of two upstreams. It's target_data_rate is calculated as:

target_data_rate_join = target_data_rate_upstream_1 + target_data_rate_upstream_2 (output ratios are 1 for simplicity).

If the join operator is a bottleneck, then it's actual_target_data_rate_join is lower than target_data_rate_join. Then, by using the backpropagation rule, the new actual_target_data_rates of upstream_1 and upstream_2 are limited by actual_target_data_rate_join.

This is where problems with accuracy appear.

The actual_target_data_rate_join still can be greater than target_data_rate_upstream_1 or target_data_rate_upstream_2. It means that the upstreams' target_data_rate remains unchanged.

Also, the actual_target_data_rate_join can be less than the target_data_rate of upstream, making them equal to actual_target_data_rate_join. But then the target_data_rate of the join will be two times greater than it was expected.

In both cases, the upstream_1 and upstream_2 operators will remain blocked after scaling. This is why the simpler approach may not be good enough.

@mxm
Copy link
Contributor

mxm commented Jul 22, 2024

The actual_target_data_rate_join still can be greater than target_data_rate_upstream_1 or target_data_rate_upstream_2. It means that the upstreams' target_data_rate remains unchanged.

Also, the actual_target_data_rate_join can be less than the target_data_rate of upstream, making them equal to actual_target_data_rate_join. But then the target_data_rate of the join will be two times greater than it was expected.

In both cases, the upstream_1 and upstream_2 operators will remain blocked after scaling. This is why the simpler approach may not be good enough.

I think it can work if we apply the same logic that we used to determine target_data_rate_join. As you pointed out, we determined the taget data rate via:

actual_data_rate_join = actual_data_rate_upstream_1 + actual_data_rate_upstream_2

Consequently, we would need to satisfy the following equation for the backpropagation:

target_data_rate_join = target_data_rate_upstream_1 + target_data_rate_upstream_2

That would mean that each input vertex gets the following limit applied:

actual_data_rate_upstream_i = target_data_rate_upstream_i - (target_data_rate_join - actual_target_data_rate_join) / N

where N is the number of inputs.

Do you think that would work? The benefit of this approach is that we leverage all the available information without having to add and backfeed additional factors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants