Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35265] Implement FlinkStateSnapshot custom resource #821

Merged
merged 4 commits into from
Aug 8, 2024

Conversation

mateczagany
Copy link
Contributor

@mateczagany mateczagany commented Apr 29, 2024

What is the purpose of the change

Implement FlinkStateSnapshot as according to FLIP-446. This PR does not include the e2e-tests and documentation.

Brief change log

  • Added FlinkStateSnapshot and all its dependent classes to flink-kubernetes-operator-api
  • Deprecated several fields in FlinkDeployment/FlinkSessionJob as accepted in the FLIP
  • Refactored several methods in FlinkService to extract the logic of saving snapshot path to other classes
  • Added test in FlinkConfigManager class to check if the CR FlinkStateSnapshot can be created on the current Kubernetes server during runtime. This is intended to be temporary to ensure a smooth upgrade process.
  • Refactored metric- and status-related classes to be able to handle the new CR

Verifying this change

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changes to the CustomResourceDescriptors: yes
  • Core observer or reconciler logic that is regularly executed: yes

Documentation

Other implementation details

  • For periodic snapshots, the Operator will create new FlinkStateSnapshot CRs, and the snapshot will be taken when that resource is reconciled.
  • For upgrade snapshots, the Operator will create a new FlinkStateSnapshot CR, marking it with alreadyExists.
  • For checkpoints, the Operator will try to fetch its final path after the checkpoint has been marked as successful. If fetching the path is not successful, the checkpoint will be marked as COMPLETED, but with empty path.
  • A new configuration (kubernetes.operator.savepoint.dispose-on-delete) was also added that was not specified in the FLIP.
  • In contrast to the FLIP, jobReference will contain an optional field namespace. If null, this will default to the namespace of the snapshot.
  • In contrast to the FLIP, CheckpointSpec#type is removed as the only supported manual checkpoint type from Flink 1.19 is FULL.

Other PRs

  • Metrics for snapshots will be implemented in in FLINK-35492.
  • Configurable max history age/count and auto-cleanup will be implemented in FLINK-35493.

@mateczagany mateczagany force-pushed the FLINK-35265 branch 19 times, most recently from b7c7644 to 6dbb319 Compare May 30, 2024 12:10
@mateczagany mateczagany marked this pull request as ready for review May 30, 2024 12:57
Copy link
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started reviewing this, added some initial comments.

In the PR description you mentioned that with the new mode the manual trigger nonce doesn't work anymore. Would it make sense to keep it working for backward compatibility (when the nonce changes simply create a snapshot resource)

@mateczagany mateczagany force-pushed the FLINK-35265 branch 3 times, most recently from 5c78025 to 0f094f3 Compare July 5, 2024 19:20
Copy link
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need some extra logic to handle savepoint job submissions and snapshot info during upgrades.

There is a bit of implicit logic currently in why the lastSavepoint is updated every time we submit a job from savepoint, and we need some new logic to capture that. (see my other long comment)

@gyfora
Copy link
Contributor

gyfora commented Jul 7, 2024

A small addition to the previous comments. The SnapshotObserver currently also observes and updates the lastSavepointInfo for terminal jobs. This is a key mechanism to be able to handle failures during stateful upgrades.

So we have to update that logic as well so that instead of record the lastSavepoint status we use a new shared logic with the snapshot custom resource mechanism. This mechanism is covered by some tests currently, enabling the snapshot CR for those tests would probably help repro the problem. For example:

  1. Introduce a failure after executing a stop-with-savepoint operation (before the snapshot CR was created)
  2. This is the point where the observer would actually observe the savepoint/checkpoint info from the terminal job and update the status
  3. Assert that the upgrade is actually executed from the correct savepoint

I believe with the current implementation the savpoint info would be lost

Copy link
Contributor

@ferenc-csaky ferenc-csaky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some testing myself as well, I think this is getting close to a good and mergable shape! Did not checked through the code, but added 2 local comments. In general, did some testing with the StateMachineExample tutorial:

  1. Deployed a fresh job
  2. Created checkpoint via CRD (1 specific comment about INCREMENTAL)
  3. Created savepoint via CRD
  4. Restored job from savepoint

So far the main functionality looks good, I plan to go though the files in the next couple days.

Copy link
Contributor

@ferenc-csaky ferenc-csaky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second part of my review, hopefully I will have some time tomorrow to finish and check the rest of the classes.

@anupamaggarwal
Copy link
Contributor

Thanks for contributing this super useful feature @mateczagany 👍 !.
I am still new to the operator codebase so pls feel free to ignore if something is not relevant, had some initial questions-

  1. IIUC if a savepoint CR has a job reference (with an invalid job which does not exist/ is not running) we would set the status field to ABANDONED. Does it also make sense to forbid the snapshot CR creation altogether in these cases through the validation webhook ?

  2. Would it be possible to add an example FlinkStateSnapshot CR to the examples folder ? (similar to basic.yaml referenced in the quickstart guide). Pls ignore if you were planning on covering this as part of the documentation PR.

  3. Does it also make sense to include some of the new configs (for example snapshot.resource.enabled) in helm/*/conf/flink-conf.yaml with the default values commented out? I was wondering if this would give more signal to first time users of this feature, but it seems we currently have a subset of operator properties exposed, so wasn't entirely sure.

@mateczagany
Copy link
Contributor Author

Thanks for your feedback @anupamaggarwal, really appreciate it!

  1. I agree, and this is what I expected to happen. Except it does not, because I forgot to add the FlinkStateSnapshot resource to the ValidatingWebhookConfiguration resource in the Helm chart, I will push a commit soon.
  2. This PR is already quite huge, so I have decided to do the examples/docs in a separate PR, there is currently a draft for it: [FLINK-35267][snapshot] FlinkStateSnapshot documentation and examples #854
  3. You are right, adding the config there would improve its visibility to the end-users, so I will do that in a future commit, thank you!

@@ -72,21 +74,26 @@ public class FlinkConfigManager {

private volatile Configuration defaultConfig;
private volatile FlinkOperatorConfiguration defaultOperatorConfiguration;
private final boolean snapshotCrdInstalled;
Copy link
Contributor

@ferenc-csaky ferenc-csaky Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed offline, the whole snapshot CRD install check is planned for 1 release to give more meaningful errors for users the first time they upgrade to an operator version that includes this feature, but will be useless after that. So I think it would make sense to mark this with a TODO to remove it in the 1.11 release.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the note, thank you. I am going to also note here so others can see:

This field will be determined in FlinkOperator when the operator starts, by checking if FlinkStateSnapshot CRD is installed on the Kubernetes cluster.
When generating the default configuration for the operator, kubernetes.operator.snapshot.resource.enabled will be set to false in this class if snapshotCrdInstalled is false, to allow users to continue using the operator if they upgraded from a previous version but did not install the new FlinkStateSnapshot CRD.

This check and field is planned to be removed in the release after this FLIP gets released (so hopefully 1.11 if this PR is released in 1.10).

.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
case COMPLETED:
var flinkDeployment = getFlinkDeployment(ctx);
return handleSnapshotCleanup(resource, flinkDeployment, ctx);
Copy link
Contributor

@anupamaggarwal anupamaggarwal Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying out the periodic snapshot feature today. Does it make sense to introduce a feature to auto-cleanup the flinkstatesnapshots.flink.apache.org CRs and only retain upto some max retention limit (we could still retain the savepoint dir?) IIUC disposeOnDelete for periodic savepoints is set to false by default (so it should be relatively safe to do so).

If the user sets the savepoint interval kubernetes.operator.periodic.savepoint.interval to a low value, and there are multiple jobs on the cluster it would lead to creation of many snapshot CRs. I am not familiar with K8s internals but could this increase the load on the API server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely! I would not feel comfortable releasing a new operator version that does not have auto-cleanup of FlinkStateSnapshot CRs. For the now-deprecated way of taking snapshots this could be configured using kubernetes.operator.savepoint.history.max.age and kubernetes.operator.savepoint.history.max.count.

You are right, disposeOnDelete is false by default, and it can be controlled by the config kubernetes.operator.savepoint.dispose-on-delete.

I am not sure of the exact approach I will take to handle cleanup, but it will be implemented under FLINK-35493 in another PR. That's a feature I felt we could easily separate to another PR, as this one is already growing quite big :D

With a low periodic interval, there won't be any more load on the API server and etcd than tracking snapshots in the Flink resource CR (as the operator does currently). Periodically cleaning them up might affect the load on API server/etcd depending on what solution we will go with. But that will be done under FLINK-35493.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done some investigation, and I think I will be able to easily implement this in the FlinkDeployment and FlinkSessionJob controller observe phase, just like the operator currently cleans up savepoints. I could just add a new InformerEventSource so we would be able to list FlinkStateSnapshot resources as secondary resources when we reconcile FlinkDeployment/FlinkSessionJob resources without querying the Kubernetes API server.

Once this PR is merged, I will create a new PR for this functionality.

@mateczagany
Copy link
Contributor Author

I rebased the branch on top of main, but I also pushed commit 566caf9

@gyfora
Copy link
Contributor

gyfora commented Aug 6, 2024

@anupamaggarwal @ferenc-csaky unless there are any concerns I would like to merge this. Afterwards we can focus on the follow up items and even get a chance to paralelnize this work

@mateczagany
Copy link
Contributor Author

Thank you @gyfora , I have done a lot of manual testing and I am quite happy with the current state of the PR. Once this is merged, I will start working on the other tickets in this FLIP, and create new PRs hopefully this week for the following:

  • Docs
  • E2E test case
  • Snapshot history automatic cleanup
  • FlinkStateSnapshot metrics integration

I will be unavailable for ~3 weeks starting from Aug 23, I hope to have the remaining PRs ready and in good state until then.

@gyfora gyfora merged commit 9819665 into apache:main Aug 8, 2024
169 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants