diff --git a/pkg/apis/app/v1beta1/types.go b/pkg/apis/app/v1beta1/types.go index 35a20dd1..14e47f37 100644 --- a/pkg/apis/app/v1beta1/types.go +++ b/pkg/apis/app/v1beta1/types.go @@ -168,19 +168,37 @@ type FlinkJobStatus struct { } type FlinkApplicationStatus struct { - Phase FlinkApplicationPhase `json:"phase"` - StartedAt *metav1.Time `json:"startedAt,omitempty"` - LastUpdatedAt *metav1.Time `json:"lastUpdatedAt,omitempty"` - Reason string `json:"reason,omitempty"` - ClusterStatus FlinkClusterStatus `json:"clusterStatus,omitempty"` - JobStatus FlinkJobStatus `json:"jobStatus"` - FailedDeployHash string `json:"failedDeployHash,omitempty"` - RollbackHash string `json:"rollbackHash,omitempty"` - DeployHash string `json:"deployHash"` - SavepointTriggerID string `json:"savepointTriggerId,omitempty"` - SavepointPath string `json:"savepointPath,omitempty"` - RetryCount int32 `json:"retryCount,omitempty"` - LastSeenError *FlinkApplicationError `json:"lastSeenError,omitempty"` + Phase FlinkApplicationPhase `json:"phase"` + StartedAt *metav1.Time `json:"startedAt,omitempty"` + LastUpdatedAt *metav1.Time `json:"lastUpdatedAt,omitempty"` + Reason string `json:"reason,omitempty"` + DeployVersion string `json:"deployVersion,omitempty"` + UpdatingVersion string `json:"updatingVersion,omitempty"` + // To ensure backward compatibility, repeat ClusterStatus and JobStatus + ClusterStatus FlinkClusterStatus `json:"clusterStatus,omitempty"` + JobStatus FlinkJobStatus `json:"jobStatus,omitempty"` + VersionStatuses []FlinkApplicationVersionStatus `json:"versionStatuses,omitempty"` + FailedDeployHash string `json:"failedDeployHash,omitempty"` + RollbackHash string `json:"rollbackHash,omitempty"` + DeployHash string `json:"deployHash"` + SavepointTriggerID string `json:"savepointTriggerId,omitempty"` + SavepointPath string `json:"savepointPath,omitempty"` + RetryCount int32 `json:"retryCount,omitempty"` + LastSeenError *FlinkApplicationError `json:"lastSeenError,omitempty"` +} + +type FlinkApplicationVersion string + +const ( + BlueFlinkApplication FlinkApplicationVersion = "Blue" + GreenFlinkApplication FlinkApplicationVersion = "Green" +) + +type FlinkApplicationVersionStatus struct { + Version FlinkApplicationVersion `json:"appVersion,omitempty"` + VersionHash string `json:"versionHash,omitempty"` + ClusterStatus FlinkClusterStatus `json:"clusterStatus,omitempty"` + JobStatus FlinkJobStatus `json:"jobStatus,omitempty"` } func (in *FlinkApplicationStatus) GetPhase() FlinkApplicationPhase { @@ -226,6 +244,8 @@ const ( FlinkApplicationRecovering FlinkApplicationPhase = "Recovering" FlinkApplicationRollingBackJob FlinkApplicationPhase = "RollingBackJob" FlinkApplicationDeployFailed FlinkApplicationPhase = "DeployFailed" + FlinkApplicationDualRunning FlinkApplicationPhase = "DualRunning" + FlinkApplicationTeardown FlinkApplicationPhase = "Teardown" ) var FlinkApplicationPhases = []FlinkApplicationPhase{ @@ -240,17 +260,35 @@ var FlinkApplicationPhases = []FlinkApplicationPhase{ FlinkApplicationRecovering, FlinkApplicationDeployFailed, FlinkApplicationRollingBackJob, + FlinkApplicationDualRunning, + FlinkApplicationTeardown, } func IsRunningPhase(phase FlinkApplicationPhase) bool { return phase == FlinkApplicationRunning || phase == FlinkApplicationDeployFailed } +func IsBlueGreenDeploymentMode(mode DeploymentMode) bool { + // Backaward compatibility between v1beta1 and v1beta1 + if mode == DeploymentModeDual { + return false + } + return mode == DeploymentModeBlueGreen +} + +func GetMaxRunningJobs(mode DeploymentMode) int32 { + if IsBlueGreenDeploymentMode(mode) { + return int32(2) + } + return int32(1) +} + type DeploymentMode string const ( - DeploymentModeSingle DeploymentMode = "Single" - DeploymentModeDual DeploymentMode = "Dual" + DeploymentModeSingle DeploymentMode = "Single" + DeploymentModeDual DeploymentMode = "Dual" + DeploymentModeBlueGreen DeploymentMode = "BlueGreen" ) type DeleteMode string diff --git a/pkg/apis/app/v1beta1/zz_generated.deepcopy.go b/pkg/apis/app/v1beta1/zz_generated.deepcopy.go index 1b23bd3a..b0e1c005 100644 --- a/pkg/apis/app/v1beta1/zz_generated.deepcopy.go +++ b/pkg/apis/app/v1beta1/zz_generated.deepcopy.go @@ -207,6 +207,13 @@ func (in *FlinkApplicationStatus) DeepCopyInto(out *FlinkApplicationStatus) { } out.ClusterStatus = in.ClusterStatus in.JobStatus.DeepCopyInto(&out.JobStatus) + if in.VersionStatuses != nil { + in, out := &in.VersionStatuses, &out.VersionStatuses + *out = make([]FlinkApplicationVersionStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.LastSeenError != nil { in, out := &in.LastSeenError, &out.LastSeenError *out = new(FlinkApplicationError) @@ -225,6 +232,24 @@ func (in *FlinkApplicationStatus) DeepCopy() *FlinkApplicationStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlinkApplicationVersionStatus) DeepCopyInto(out *FlinkApplicationVersionStatus) { + *out = *in + out.ClusterStatus = in.ClusterStatus + in.JobStatus.DeepCopyInto(&out.JobStatus) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlinkApplicationVersionStatus. +func (in *FlinkApplicationVersionStatus) DeepCopy() *FlinkApplicationVersionStatus { + if in == nil { + return nil + } + out := new(FlinkApplicationVersionStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *FlinkClusterStatus) DeepCopyInto(out *FlinkClusterStatus) { *out = *in diff --git a/pkg/client/clientset/versioned/typed/app/v1alpha1/app_client.go b/pkg/client/clientset/versioned/typed/app/v1beta2/app_client.go similarity index 55% rename from pkg/client/clientset/versioned/typed/app/v1alpha1/app_client.go rename to pkg/client/clientset/versioned/typed/app/v1beta2/app_client.go index 0b5e3fff..5fdf3da5 100644 --- a/pkg/client/clientset/versioned/typed/app/v1alpha1/app_client.go +++ b/pkg/client/clientset/versioned/typed/app/v1beta2/app_client.go @@ -1,30 +1,30 @@ // Code generated by client-gen. DO NOT EDIT. -package v1alpha1 +package v1beta1 import ( - v1alpha1 "github.com/lyft/flinkk8soperator/pkg/apis/app/v1alpha1" + v1beta1 "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" "github.com/lyft/flinkk8soperator/pkg/client/clientset/versioned/scheme" serializer "k8s.io/apimachinery/pkg/runtime/serializer" rest "k8s.io/client-go/rest" ) -type FlinkV1alpha1Interface interface { +type FlinkV1beta2Interface interface { RESTClient() rest.Interface FlinkApplicationsGetter } -// FlinkV1alpha1Client is used to interact with features provided by the flink.k8s.io group. -type FlinkV1alpha1Client struct { +// FlinkV1beta2Client is used to interact with features provided by the flink.k8s.io group. +type FlinkV1beta2Client struct { restClient rest.Interface } -func (c *FlinkV1alpha1Client) FlinkApplications(namespace string) FlinkApplicationInterface { +func (c *FlinkV1beta2Client) FlinkApplications(namespace string) FlinkApplicationInterface { return newFlinkApplications(c, namespace) } -// NewForConfig creates a new FlinkV1alpha1Client for the given config. -func NewForConfig(c *rest.Config) (*FlinkV1alpha1Client, error) { +// NewForConfig creates a new FlinkV1beta2Client for the given config. +func NewForConfig(c *rest.Config) (*FlinkV1beta2Client, error) { config := *c if err := setConfigDefaults(&config); err != nil { return nil, err @@ -33,12 +33,12 @@ func NewForConfig(c *rest.Config) (*FlinkV1alpha1Client, error) { if err != nil { return nil, err } - return &FlinkV1alpha1Client{client}, nil + return &FlinkV1beta2Client{client}, nil } -// NewForConfigOrDie creates a new FlinkV1alpha1Client for the given config and +// NewForConfigOrDie creates a new FlinkV1beta2Client for the given config and // panics if there is an error in the config. -func NewForConfigOrDie(c *rest.Config) *FlinkV1alpha1Client { +func NewForConfigOrDie(c *rest.Config) *FlinkV1beta2Client { client, err := NewForConfig(c) if err != nil { panic(err) @@ -46,13 +46,13 @@ func NewForConfigOrDie(c *rest.Config) *FlinkV1alpha1Client { return client } -// New creates a new FlinkV1alpha1Client for the given RESTClient. -func New(c rest.Interface) *FlinkV1alpha1Client { - return &FlinkV1alpha1Client{c} +// New creates a new FlinkV1beta2Client for the given RESTClient. +func New(c rest.Interface) *FlinkV1beta2Client { + return &FlinkV1beta2Client{c} } func setConfigDefaults(config *rest.Config) error { - gv := v1alpha1.SchemeGroupVersion + gv := v1beta1.SchemeGroupVersion config.GroupVersion = &gv config.APIPath = "/apis" config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs} @@ -66,7 +66,7 @@ func setConfigDefaults(config *rest.Config) error { // RESTClient returns a RESTClient that is used to communicate // with API server by this client implementation. -func (c *FlinkV1alpha1Client) RESTClient() rest.Interface { +func (c *FlinkV1beta2Client) RESTClient() rest.Interface { if c == nil { return nil } diff --git a/pkg/client/clientset/versioned/typed/app/v1alpha1/doc.go b/pkg/client/clientset/versioned/typed/app/v1beta2/doc.go similarity index 86% rename from pkg/client/clientset/versioned/typed/app/v1alpha1/doc.go rename to pkg/client/clientset/versioned/typed/app/v1beta2/doc.go index 93a7ca4e..897c0995 100644 --- a/pkg/client/clientset/versioned/typed/app/v1alpha1/doc.go +++ b/pkg/client/clientset/versioned/typed/app/v1beta2/doc.go @@ -1,4 +1,4 @@ // Code generated by client-gen. DO NOT EDIT. // This package has the automatically generated typed clients. -package v1alpha1 +package v1beta1 diff --git a/pkg/client/clientset/versioned/typed/app/v1alpha1/fake/doc.go b/pkg/client/clientset/versioned/typed/app/v1beta2/fake/doc.go similarity index 100% rename from pkg/client/clientset/versioned/typed/app/v1alpha1/fake/doc.go rename to pkg/client/clientset/versioned/typed/app/v1beta2/fake/doc.go diff --git a/pkg/client/clientset/versioned/typed/app/v1alpha1/fake/fake_app_client.go b/pkg/client/clientset/versioned/typed/app/v1beta2/fake/fake_app_client.go similarity index 55% rename from pkg/client/clientset/versioned/typed/app/v1alpha1/fake/fake_app_client.go rename to pkg/client/clientset/versioned/typed/app/v1beta2/fake/fake_app_client.go index 017abeb5..d32b1655 100644 --- a/pkg/client/clientset/versioned/typed/app/v1alpha1/fake/fake_app_client.go +++ b/pkg/client/clientset/versioned/typed/app/v1beta2/fake/fake_app_client.go @@ -3,22 +3,22 @@ package fake import ( - v1alpha1 "github.com/lyft/flinkk8soperator/pkg/client/clientset/versioned/typed/app/v1alpha1" + v1beta1 "github.com/lyft/flinkk8soperator/pkg/client/clientset/versioned/typed/app/v1beta1" rest "k8s.io/client-go/rest" testing "k8s.io/client-go/testing" ) -type FakeFlinkV1alpha1 struct { +type FakeFlinkV1beta2 struct { *testing.Fake } -func (c *FakeFlinkV1alpha1) FlinkApplications(namespace string) v1alpha1.FlinkApplicationInterface { +func (c *FakeFlinkV1beta2) FlinkApplications(namespace string) v1beta1.FlinkApplicationInterface { return &FakeFlinkApplications{c, namespace} } // RESTClient returns a RESTClient that is used to communicate // with API server by this client implementation. -func (c *FakeFlinkV1alpha1) RESTClient() rest.Interface { +func (c *FakeFlinkV1beta2) RESTClient() rest.Interface { var ret *rest.RESTClient return ret } diff --git a/pkg/client/clientset/versioned/typed/app/v1alpha1/fake/fake_flinkapplication.go b/pkg/client/clientset/versioned/typed/app/v1beta2/fake/fake_flinkapplication.go similarity index 68% rename from pkg/client/clientset/versioned/typed/app/v1alpha1/fake/fake_flinkapplication.go rename to pkg/client/clientset/versioned/typed/app/v1beta2/fake/fake_flinkapplication.go index 73435df3..1ee1d653 100644 --- a/pkg/client/clientset/versioned/typed/app/v1alpha1/fake/fake_flinkapplication.go +++ b/pkg/client/clientset/versioned/typed/app/v1beta2/fake/fake_flinkapplication.go @@ -3,7 +3,7 @@ package fake import ( - v1alpha1 "github.com/lyft/flinkk8soperator/pkg/apis/app/v1alpha1" + v1beta1 "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" labels "k8s.io/apimachinery/pkg/labels" schema "k8s.io/apimachinery/pkg/runtime/schema" @@ -14,29 +14,29 @@ import ( // FakeFlinkApplications implements FlinkApplicationInterface type FakeFlinkApplications struct { - Fake *FakeFlinkV1alpha1 + Fake *FakeFlinkV1beta2 ns string } -var flinkapplicationsResource = schema.GroupVersionResource{Group: "flink.k8s.io", Version: "v1alpha1", Resource: "flinkapplications"} +var flinkapplicationsResource = schema.GroupVersionResource{Group: "flink.k8s.io", Version: "v1beta1", Resource: "flinkapplications"} -var flinkapplicationsKind = schema.GroupVersionKind{Group: "flink.k8s.io", Version: "v1alpha1", Kind: "FlinkApplication"} +var flinkapplicationsKind = schema.GroupVersionKind{Group: "flink.k8s.io", Version: "v1beta1", Kind: "FlinkApplication"} // Get takes name of the flinkApplication, and returns the corresponding flinkApplication object, and an error if there is any. -func (c *FakeFlinkApplications) Get(name string, options v1.GetOptions) (result *v1alpha1.FlinkApplication, err error) { +func (c *FakeFlinkApplications) Get(name string, options v1.GetOptions) (result *v1beta1.FlinkApplication, err error) { obj, err := c.Fake. - Invokes(testing.NewGetAction(flinkapplicationsResource, c.ns, name), &v1alpha1.FlinkApplication{}) + Invokes(testing.NewGetAction(flinkapplicationsResource, c.ns, name), &v1beta1.FlinkApplication{}) if obj == nil { return nil, err } - return obj.(*v1alpha1.FlinkApplication), err + return obj.(*v1beta1.FlinkApplication), err } // List takes label and field selectors, and returns the list of FlinkApplications that match those selectors. -func (c *FakeFlinkApplications) List(opts v1.ListOptions) (result *v1alpha1.FlinkApplicationList, err error) { +func (c *FakeFlinkApplications) List(opts v1.ListOptions) (result *v1beta1.FlinkApplicationList, err error) { obj, err := c.Fake. - Invokes(testing.NewListAction(flinkapplicationsResource, flinkapplicationsKind, c.ns, opts), &v1alpha1.FlinkApplicationList{}) + Invokes(testing.NewListAction(flinkapplicationsResource, flinkapplicationsKind, c.ns, opts), &v1beta1.FlinkApplicationList{}) if obj == nil { return nil, err @@ -46,8 +46,8 @@ func (c *FakeFlinkApplications) List(opts v1.ListOptions) (result *v1alpha1.Flin if label == nil { label = labels.Everything() } - list := &v1alpha1.FlinkApplicationList{ListMeta: obj.(*v1alpha1.FlinkApplicationList).ListMeta} - for _, item := range obj.(*v1alpha1.FlinkApplicationList).Items { + list := &v1beta1.FlinkApplicationList{ListMeta: obj.(*v1beta1.FlinkApplicationList).ListMeta} + for _, item := range obj.(*v1beta1.FlinkApplicationList).Items { if label.Matches(labels.Set(item.Labels)) { list.Items = append(list.Items, item) } @@ -63,31 +63,31 @@ func (c *FakeFlinkApplications) Watch(opts v1.ListOptions) (watch.Interface, err } // Create takes the representation of a flinkApplication and creates it. Returns the server's representation of the flinkApplication, and an error, if there is any. -func (c *FakeFlinkApplications) Create(flinkApplication *v1alpha1.FlinkApplication) (result *v1alpha1.FlinkApplication, err error) { +func (c *FakeFlinkApplications) Create(flinkApplication *v1beta1.FlinkApplication) (result *v1beta1.FlinkApplication, err error) { obj, err := c.Fake. - Invokes(testing.NewCreateAction(flinkapplicationsResource, c.ns, flinkApplication), &v1alpha1.FlinkApplication{}) + Invokes(testing.NewCreateAction(flinkapplicationsResource, c.ns, flinkApplication), &v1beta1.FlinkApplication{}) if obj == nil { return nil, err } - return obj.(*v1alpha1.FlinkApplication), err + return obj.(*v1beta1.FlinkApplication), err } // Update takes the representation of a flinkApplication and updates it. Returns the server's representation of the flinkApplication, and an error, if there is any. -func (c *FakeFlinkApplications) Update(flinkApplication *v1alpha1.FlinkApplication) (result *v1alpha1.FlinkApplication, err error) { +func (c *FakeFlinkApplications) Update(flinkApplication *v1beta1.FlinkApplication) (result *v1beta1.FlinkApplication, err error) { obj, err := c.Fake. - Invokes(testing.NewUpdateAction(flinkapplicationsResource, c.ns, flinkApplication), &v1alpha1.FlinkApplication{}) + Invokes(testing.NewUpdateAction(flinkapplicationsResource, c.ns, flinkApplication), &v1beta1.FlinkApplication{}) if obj == nil { return nil, err } - return obj.(*v1alpha1.FlinkApplication), err + return obj.(*v1beta1.FlinkApplication), err } // Delete takes name of the flinkApplication and deletes it. Returns an error if one occurs. func (c *FakeFlinkApplications) Delete(name string, options *v1.DeleteOptions) error { _, err := c.Fake. - Invokes(testing.NewDeleteAction(flinkapplicationsResource, c.ns, name), &v1alpha1.FlinkApplication{}) + Invokes(testing.NewDeleteAction(flinkapplicationsResource, c.ns, name), &v1beta1.FlinkApplication{}) return err } @@ -96,17 +96,17 @@ func (c *FakeFlinkApplications) Delete(name string, options *v1.DeleteOptions) e func (c *FakeFlinkApplications) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { action := testing.NewDeleteCollectionAction(flinkapplicationsResource, c.ns, listOptions) - _, err := c.Fake.Invokes(action, &v1alpha1.FlinkApplicationList{}) + _, err := c.Fake.Invokes(action, &v1beta1.FlinkApplicationList{}) return err } // Patch applies the patch and returns the patched flinkApplication. -func (c *FakeFlinkApplications) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.FlinkApplication, err error) { +func (c *FakeFlinkApplications) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1beta1.FlinkApplication, err error) { obj, err := c.Fake. - Invokes(testing.NewPatchSubresourceAction(flinkapplicationsResource, c.ns, name, pt, data, subresources...), &v1alpha1.FlinkApplication{}) + Invokes(testing.NewPatchSubresourceAction(flinkapplicationsResource, c.ns, name, pt, data, subresources...), &v1beta1.FlinkApplication{}) if obj == nil { return nil, err } - return obj.(*v1alpha1.FlinkApplication), err + return obj.(*v1beta1.FlinkApplication), err } diff --git a/pkg/client/clientset/versioned/typed/app/v1alpha1/flinkapplication.go b/pkg/client/clientset/versioned/typed/app/v1beta2/flinkapplication.go similarity index 78% rename from pkg/client/clientset/versioned/typed/app/v1alpha1/flinkapplication.go rename to pkg/client/clientset/versioned/typed/app/v1beta2/flinkapplication.go index 73a064a9..d71015d2 100644 --- a/pkg/client/clientset/versioned/typed/app/v1alpha1/flinkapplication.go +++ b/pkg/client/clientset/versioned/typed/app/v1beta2/flinkapplication.go @@ -1,11 +1,11 @@ // Code generated by client-gen. DO NOT EDIT. -package v1alpha1 +package v1beta1 import ( "time" - v1alpha1 "github.com/lyft/flinkk8soperator/pkg/apis/app/v1alpha1" + v1beta1 "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" scheme "github.com/lyft/flinkk8soperator/pkg/client/clientset/versioned/scheme" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" types "k8s.io/apimachinery/pkg/types" @@ -21,14 +21,14 @@ type FlinkApplicationsGetter interface { // FlinkApplicationInterface has methods to work with FlinkApplication resources. type FlinkApplicationInterface interface { - Create(*v1alpha1.FlinkApplication) (*v1alpha1.FlinkApplication, error) - Update(*v1alpha1.FlinkApplication) (*v1alpha1.FlinkApplication, error) + Create(*v1beta1.FlinkApplication) (*v1beta1.FlinkApplication, error) + Update(*v1beta1.FlinkApplication) (*v1beta1.FlinkApplication, error) Delete(name string, options *v1.DeleteOptions) error DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error - Get(name string, options v1.GetOptions) (*v1alpha1.FlinkApplication, error) - List(opts v1.ListOptions) (*v1alpha1.FlinkApplicationList, error) + Get(name string, options v1.GetOptions) (*v1beta1.FlinkApplication, error) + List(opts v1.ListOptions) (*v1beta1.FlinkApplicationList, error) Watch(opts v1.ListOptions) (watch.Interface, error) - Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.FlinkApplication, err error) + Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1beta1.FlinkApplication, err error) FlinkApplicationExpansion } @@ -39,7 +39,7 @@ type flinkApplications struct { } // newFlinkApplications returns a FlinkApplications -func newFlinkApplications(c *FlinkV1alpha1Client, namespace string) *flinkApplications { +func newFlinkApplications(c *FlinkV1beta2Client, namespace string) *flinkApplications { return &flinkApplications{ client: c.RESTClient(), ns: namespace, @@ -47,8 +47,8 @@ func newFlinkApplications(c *FlinkV1alpha1Client, namespace string) *flinkApplic } // Get takes name of the flinkApplication, and returns the corresponding flinkApplication object, and an error if there is any. -func (c *flinkApplications) Get(name string, options v1.GetOptions) (result *v1alpha1.FlinkApplication, err error) { - result = &v1alpha1.FlinkApplication{} +func (c *flinkApplications) Get(name string, options v1.GetOptions) (result *v1beta1.FlinkApplication, err error) { + result = &v1beta1.FlinkApplication{} err = c.client.Get(). Namespace(c.ns). Resource("flinkapplications"). @@ -60,12 +60,12 @@ func (c *flinkApplications) Get(name string, options v1.GetOptions) (result *v1a } // List takes label and field selectors, and returns the list of FlinkApplications that match those selectors. -func (c *flinkApplications) List(opts v1.ListOptions) (result *v1alpha1.FlinkApplicationList, err error) { +func (c *flinkApplications) List(opts v1.ListOptions) (result *v1beta1.FlinkApplicationList, err error) { var timeout time.Duration if opts.TimeoutSeconds != nil { timeout = time.Duration(*opts.TimeoutSeconds) * time.Second } - result = &v1alpha1.FlinkApplicationList{} + result = &v1beta1.FlinkApplicationList{} err = c.client.Get(). Namespace(c.ns). Resource("flinkapplications"). @@ -92,8 +92,8 @@ func (c *flinkApplications) Watch(opts v1.ListOptions) (watch.Interface, error) } // Create takes the representation of a flinkApplication and creates it. Returns the server's representation of the flinkApplication, and an error, if there is any. -func (c *flinkApplications) Create(flinkApplication *v1alpha1.FlinkApplication) (result *v1alpha1.FlinkApplication, err error) { - result = &v1alpha1.FlinkApplication{} +func (c *flinkApplications) Create(flinkApplication *v1beta1.FlinkApplication) (result *v1beta1.FlinkApplication, err error) { + result = &v1beta1.FlinkApplication{} err = c.client.Post(). Namespace(c.ns). Resource("flinkapplications"). @@ -104,8 +104,8 @@ func (c *flinkApplications) Create(flinkApplication *v1alpha1.FlinkApplication) } // Update takes the representation of a flinkApplication and updates it. Returns the server's representation of the flinkApplication, and an error, if there is any. -func (c *flinkApplications) Update(flinkApplication *v1alpha1.FlinkApplication) (result *v1alpha1.FlinkApplication, err error) { - result = &v1alpha1.FlinkApplication{} +func (c *flinkApplications) Update(flinkApplication *v1beta1.FlinkApplication) (result *v1beta1.FlinkApplication, err error) { + result = &v1beta1.FlinkApplication{} err = c.client.Put(). Namespace(c.ns). Resource("flinkapplications"). @@ -144,8 +144,8 @@ func (c *flinkApplications) DeleteCollection(options *v1.DeleteOptions, listOpti } // Patch applies the patch and returns the patched flinkApplication. -func (c *flinkApplications) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.FlinkApplication, err error) { - result = &v1alpha1.FlinkApplication{} +func (c *flinkApplications) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1beta1.FlinkApplication, err error) { + result = &v1beta1.FlinkApplication{} err = c.client.Patch(pt). Namespace(c.ns). Resource("flinkapplications"). diff --git a/pkg/client/clientset/versioned/typed/app/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/app/v1beta2/generated_expansion.go similarity index 84% rename from pkg/client/clientset/versioned/typed/app/v1alpha1/generated_expansion.go rename to pkg/client/clientset/versioned/typed/app/v1beta2/generated_expansion.go index ab955a79..eaf95aa7 100644 --- a/pkg/client/clientset/versioned/typed/app/v1alpha1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/app/v1beta2/generated_expansion.go @@ -1,5 +1,5 @@ // Code generated by client-gen. DO NOT EDIT. -package v1alpha1 +package v1beta1 type FlinkApplicationExpansion interface{} diff --git a/pkg/controller/flink/container_utils.go b/pkg/controller/flink/container_utils.go index 38b53bcd..e760b6f9 100644 --- a/pkg/controller/flink/container_utils.go +++ b/pkg/controller/flink/container_utils.go @@ -33,6 +33,8 @@ const ( FlinkAppHash = "flink-app-hash" FlinkJobProperties = "flink-job-properties" RestartNonce = "restart-nonce" + FlinkApplicationVersionEnv = "FLINK_APPLICATION_VERSION" + FlinkApplicationVersion = "flink-application-version" ) func getFlinkContainerName(containerName string) string { @@ -56,6 +58,9 @@ func getCommonAnnotations(app *v1beta1.FlinkApplication) map[string]string { if app.Spec.RestartNonce != "" { annotations[RestartNonce] = app.Spec.RestartNonce } + if v1beta1.IsBlueGreenDeploymentMode(app.Spec.DeploymentMode) { + annotations[FlinkApplicationVersion] = app.Status.UpdatingVersion + } return annotations } @@ -117,6 +122,7 @@ func GetFlinkContainerEnv(app *v1beta1.FlinkApplication) []v1.EnvVar { if err == nil { env = append(env, flinkEnv...) } + env = append(env, GetDeploySpecificEnv(app)...) return env } @@ -218,3 +224,18 @@ func InjectOperatorCustomizedConfig(deployment *appsv1.Deployment, app *v1beta1. } deployment.Spec.Template.Spec.Containers = newContainers } + +// Injects labels and environment variables required for blue green deploys +func GetDeploySpecificEnv(app *v1beta1.FlinkApplication) []v1.EnvVar { + if !v1beta1.IsBlueGreenDeploymentMode(app.Spec.DeploymentMode) { + return []v1.EnvVar{} + } + + return []v1.EnvVar{ + { + Name: FlinkApplicationVersionEnv, + Value: app.Status.UpdatingVersion, + }, + } + +} diff --git a/pkg/controller/flink/flink.go b/pkg/controller/flink/flink.go index 8470b94c..e8f36ed6 100644 --- a/pkg/controller/flink/flink.go +++ b/pkg/controller/flink/flink.go @@ -27,6 +27,7 @@ import ( const proxyURL = "http://localhost:%d/api/v1/namespaces/%s/services/%s:8081/proxy" const port = 8081 +const indexOffset = 1 // If the last hearbeat from a taskmanager was more than taskManagerHeartbeatThreshold, the task // manager is considered unhealthy. @@ -92,6 +93,24 @@ type ControllerInterface interface { // Compares and updates new job status with current job status // Returns true if there is a change in JobStatus CompareAndUpdateJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (bool, error) + + // Gets the last updated cluster status + GetLatestClusterStatus(ctx context.Context, app *v1beta1.FlinkApplication) v1beta1.FlinkClusterStatus + + // Gets the last updated job status + GetLatestJobStatus(ctx context.Context, app *v1beta1.FlinkApplication) v1beta1.FlinkJobStatus + + // Gets the last updated job ID + GetLatestJobID(ctx context.Context, app *v1beta1.FlinkApplication) string + + // Updates the jobID on the latest jobStatus + UpdateLatestJobID(ctx context.Context, app *v1beta1.FlinkApplication, jobID string) + + // Update jobStatus on the latest VersionStatuses + UpdateLatestJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, jobStatus v1beta1.FlinkJobStatus) + + // Update clusterStatus on the latest VersionStatuses + UpdateLatestClusterStatus(ctx context.Context, app *v1beta1.FlinkApplication, jobStatus v1beta1.FlinkClusterStatus) } func NewController(k8sCluster k8.ClusterInterface, eventRecorder record.EventRecorder, config controllerConfig.RuntimeConfig) ControllerInterface { @@ -132,7 +151,7 @@ type Controller struct { eventRecorder record.EventRecorder } -func getURLFromApp(application *v1beta1.FlinkApplication, hash string) string { +func (f *Controller) getURLFromApp(application *v1beta1.FlinkApplication, hash string) string { service := VersionedJobManagerServiceName(application, hash) cfg := controllerConfig.GetConfig() if cfg.UseProxy { @@ -141,23 +160,23 @@ func getURLFromApp(application *v1beta1.FlinkApplication, hash string) string { return fmt.Sprintf("http://%s.%s:%d", service, application.Namespace, port) } -func getClusterOverviewURL(app *v1beta1.FlinkApplication) string { - externalURL := getExternalURLFromApp(app) +func (f *Controller) getClusterOverviewURL(app *v1beta1.FlinkApplication) string { + externalURL := f.getExternalURLFromApp(app) if externalURL != "" { return fmt.Sprintf(externalURL + client.WebUIAnchor + client.GetClusterOverviewURL) } return "" } -func getJobOverviewURL(app *v1beta1.FlinkApplication) string { - externalURL := getExternalURLFromApp(app) +func (f *Controller) getJobOverviewURL(ctx context.Context, app *v1beta1.FlinkApplication) string { + externalURL := f.getExternalURLFromApp(app) if externalURL != "" { - return fmt.Sprintf(externalURL+client.WebUIAnchor+client.GetJobsOverviewURL, app.Status.JobStatus.JobID) + return fmt.Sprintf(externalURL+client.WebUIAnchor+client.GetJobsOverviewURL, f.GetLatestJobID(ctx, app)) } return "" } -func getExternalURLFromApp(application *v1beta1.FlinkApplication) string { +func (f *Controller) getExternalURLFromApp(application *v1beta1.FlinkApplication) string { cfg := controllerConfig.GetConfig() // Local environment if cfg.UseProxy { @@ -195,7 +214,7 @@ func (f *Controller) deploymentMatches(ctx context.Context, deployment *v1.Deplo } func (f *Controller) GetJobsForApplication(ctx context.Context, application *v1beta1.FlinkApplication, hash string) ([]client.FlinkJob, error) { - jobResponse, err := f.flinkClient.GetJobs(ctx, getURLFromApp(application, hash)) + jobResponse, err := f.flinkClient.GetJobs(ctx, f.getURLFromApp(application, hash)) if err != nil { return nil, err } @@ -204,11 +223,11 @@ func (f *Controller) GetJobsForApplication(ctx context.Context, application *v1b } func (f *Controller) GetJobForApplication(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) { - if application.Status.JobStatus.JobID == "" { + if f.GetLatestJobID(ctx, application) == "" { return nil, nil } - jobResponse, err := f.flinkClient.GetJobOverview(ctx, getURLFromApp(application, hash), application.Status.JobStatus.JobID) + jobResponse, err := f.flinkClient.GetJobOverview(ctx, f.getURLFromApp(application, hash), f.GetLatestJobID(ctx, application)) if err != nil { return nil, err } @@ -218,28 +237,28 @@ func (f *Controller) GetJobForApplication(ctx context.Context, application *v1be // The operator for now assumes and is intended to run single application per Flink Cluster. // Once we move to run multiple applications, this has to be removed/updated -func (f *Controller) getJobIDForApplication(application *v1beta1.FlinkApplication) (string, error) { - if application.Status.JobStatus.JobID != "" { - return application.Status.JobStatus.JobID, nil +func (f *Controller) getJobIDForApplication(ctx context.Context, application *v1beta1.FlinkApplication) (string, error) { + if f.GetLatestJobID(ctx, application) != "" { + return f.GetLatestJobID(ctx, application), nil } return "", errors.New("active job id not available") } func (f *Controller) CancelWithSavepoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) { - jobID, err := f.getJobIDForApplication(application) + jobID, err := f.getJobIDForApplication(ctx, application) if err != nil { return "", err } - return f.flinkClient.CancelJobWithSavepoint(ctx, getURLFromApp(application, hash), jobID) + return f.flinkClient.CancelJobWithSavepoint(ctx, f.getURLFromApp(application, hash), jobID) } func (f *Controller) ForceCancel(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error { - jobID, err := f.getJobIDForApplication(application) + jobID, err := f.getJobIDForApplication(ctx, application) if err != nil { return err } - return f.flinkClient.ForceCancelJob(ctx, getURLFromApp(application, hash), jobID) + return f.flinkClient.ForceCancelJob(ctx, f.getURLFromApp(application, hash), jobID) } func (f *Controller) CreateCluster(ctx context.Context, application *v1beta1.FlinkApplication) error { @@ -273,7 +292,7 @@ func (f *Controller) StartFlinkJob(ctx context.Context, application *v1beta1.Fli savepointPath string) (string, error) { response, err := f.flinkClient.SubmitJob( ctx, - getURLFromApp(application, hash), + f.getURLFromApp(application, hash), jarName, client.SubmitJobRequest{ Parallelism: parallelism, @@ -293,11 +312,11 @@ func (f *Controller) StartFlinkJob(ctx context.Context, application *v1beta1.Fli } func (f *Controller) GetSavepointStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { - jobID, err := f.getJobIDForApplication(application) + jobID, err := f.getJobIDForApplication(ctx, application) if err != nil { return nil, err } - return f.flinkClient.CheckSavepointStatus(ctx, getURLFromApp(application, hash), jobID, application.Status.SavepointTriggerID) + return f.flinkClient.CheckSavepointStatus(ctx, f.getURLFromApp(application, hash), jobID, application.Status.SavepointTriggerID) } func (f *Controller) IsClusterReady(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { @@ -319,7 +338,7 @@ func (f *Controller) IsClusterReady(ctx context.Context, application *v1beta1.Fl } func (f *Controller) IsServiceReady(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) { - resp, err := f.flinkClient.GetClusterOverview(ctx, getURLFromApp(application, hash)) + resp, err := f.flinkClient.GetClusterOverview(ctx, f.getURLFromApp(application, hash)) if err != nil { logger.Infof(ctx, "Error response indicating flink API is not ready to handle request %v", err) return false, err @@ -446,14 +465,15 @@ func (f *Controller) DeleteOldResourcesForApp(ctx context.Context, app *v1beta1. } func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) { - checkpoint, err := f.flinkClient.GetLatestCheckpoint(ctx, getURLFromApp(application, hash), application.Status.JobStatus.JobID) + checkpoint, err := f.flinkClient.GetLatestCheckpoint(ctx, f.getURLFromApp(application, hash), f.GetLatestJobID(ctx, application)) var checkpointPath string var checkpointTime int64 if err != nil { + jobStatus := f.GetLatestJobStatus(ctx, application) // we failed to query the JM, try to pull it out of the resource - if application.Status.JobStatus.LastCheckpointPath != "" && application.Status.JobStatus.LastCheckpointTime != nil { - checkpointPath = application.Status.JobStatus.LastCheckpointPath - checkpointTime = application.Status.JobStatus.LastCheckpointTime.Unix() + if jobStatus.LastCheckpointPath != "" && jobStatus.LastCheckpointTime != nil { + checkpointPath = jobStatus.LastCheckpointPath + checkpointTime = jobStatus.LastCheckpointTime.Unix() logger.Warnf(ctx, "Could not query JobManager for latest externalized checkpoint, using"+ " last seen checkpoint") } else { @@ -485,6 +505,9 @@ func (f *Controller) LogEvent(ctx context.Context, app *v1beta1.FlinkApplication // Gets and updates the cluster status func (f *Controller) CompareAndUpdateClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) { + if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + return f.compareAndUpdateBlueGreenClusterStatus(ctx, application, hash) + } // Error retrieving cluster / taskmanagers overview (after startup/readiness) --> Red // If there is an error this loop will return with Health set to Red oldClusterStatus := application.Status.ClusterStatus @@ -495,10 +518,10 @@ func (f *Controller) CompareAndUpdateClusterStatus(ctx context.Context, applicat return false, err } - application.Status.ClusterStatus.ClusterOverviewURL = getClusterOverviewURL(application) + application.Status.ClusterStatus.ClusterOverviewURL = f.getClusterOverviewURL(application) application.Status.ClusterStatus.NumberOfTaskManagers = deployment.Taskmanager.Status.AvailableReplicas // Get Cluster overview - response, err := f.flinkClient.GetClusterOverview(ctx, getURLFromApp(application, hash)) + response, err := f.flinkClient.GetClusterOverview(ctx, f.getURLFromApp(application, hash)) if err != nil { return false, err } @@ -507,7 +530,7 @@ func (f *Controller) CompareAndUpdateClusterStatus(ctx context.Context, applicat application.Status.ClusterStatus.NumberOfTaskSlots = response.NumberOfTaskSlots // Get Healthy Taskmanagers - tmResponse, tmErr := f.flinkClient.GetTaskManagers(ctx, getURLFromApp(application, hash)) + tmResponse, tmErr := f.flinkClient.GetTaskManagers(ctx, f.getURLFromApp(application, hash)) if tmErr != nil { return false, tmErr } @@ -539,7 +562,9 @@ func getHealthyTaskManagerCount(response *client.TaskManagersResponse) int32 { } func (f *Controller) CompareAndUpdateJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (bool, error) { - // Initialize the last failing time to beginning of time if it's never been set + if v1beta1.IsBlueGreenDeploymentMode(app.Spec.DeploymentMode) { + return f.compareAndUpdateBlueGreenJobStatus(ctx, app, hash) + } if app.Status.JobStatus.LastFailingTime == nil { initTime := metav1.NewTime(time.Time{}) app.Status.JobStatus.LastFailingTime = &initTime @@ -547,17 +572,17 @@ func (f *Controller) CompareAndUpdateJobStatus(ctx context.Context, app *v1beta1 oldJobStatus := app.Status.JobStatus app.Status.JobStatus.JobID = oldJobStatus.JobID - jobResponse, err := f.flinkClient.GetJobOverview(ctx, getURLFromApp(app, hash), app.Status.JobStatus.JobID) + jobResponse, err := f.flinkClient.GetJobOverview(ctx, f.getURLFromApp(app, hash), f.GetLatestJobID(ctx, app)) if err != nil { return false, err } - checkpoints, err := f.flinkClient.GetCheckpointCounts(ctx, getURLFromApp(app, hash), app.Status.JobStatus.JobID) + checkpoints, err := f.flinkClient.GetCheckpointCounts(ctx, f.getURLFromApp(app, hash), f.GetLatestJobID(ctx, app)) if err != nil { return false, err } // Job status - app.Status.JobStatus.JobOverviewURL = getJobOverviewURL(app) + app.Status.JobStatus.JobOverviewURL = f.getJobOverviewURL(ctx, app) app.Status.JobStatus.State = v1beta1.JobState(jobResponse.State) jobStartTime := metav1.NewTime(time.Unix(jobResponse.StartTime/1000, 0)) app.Status.JobStatus.StartTime = &jobStartTime @@ -622,6 +647,222 @@ func (f *Controller) CompareAndUpdateJobStatus(ctx context.Context, app *v1beta1 currTime := metav1.Now() app.Status.JobStatus.LastFailingTime = &currTime } - return !apiequality.Semantic.DeepEqual(oldJobStatus, app.Status.JobStatus), err } + +// Only used with the BlueGreen DeploymentMode +// A method to identify the current VersionStatus +func getCurrentStatusIndex(app *v1beta1.FlinkApplication) int32 { + // The current VersionStatus is the first (or earlier) version when + // 1. The application is a Running phase and there's only one job running + // 2. First deploy ever + // 3. When the savepoint is being taken on the existing job + if v1beta1.IsRunningPhase(app.Status.Phase) || app.Status.DeployHash == "" || + app.Status.Phase == v1beta1.FlinkApplicationSavepointing { + return 0 + } + + if app.Status.Phase == v1beta1.FlinkApplicationDualRunning { + return 1 + } + + // activeJobs and maxRunningJobs would be different once a Teardown has happened and + // the app has moved back to a Running state. + activeJobs := int32(len(app.Status.VersionStatuses)) + maxRunningJobs := v1beta1.GetMaxRunningJobs(app.Spec.DeploymentMode) + index := Min(activeJobs, maxRunningJobs) - indexOffset + return index +} + +func Min(x, y int32) int32 { + if x < y { + return x + } + return y +} + +func (f *Controller) GetLatestClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication) v1beta1.FlinkClusterStatus { + if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + return application.Status.VersionStatuses[getCurrentStatusIndex(application)].ClusterStatus + } + return application.Status.ClusterStatus +} + +func (f *Controller) GetLatestJobStatus(ctx context.Context, application *v1beta1.FlinkApplication) v1beta1.FlinkJobStatus { + if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + return application.Status.VersionStatuses[getCurrentStatusIndex(application)].JobStatus + } + return application.Status.JobStatus + +} + +func (f *Controller) UpdateLatestJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, jobStatus v1beta1.FlinkJobStatus) { + if v1beta1.IsBlueGreenDeploymentMode(app.Spec.DeploymentMode) { + app.Status.VersionStatuses[getCurrentStatusIndex(app)].JobStatus = jobStatus + return + } + app.Status.JobStatus = jobStatus +} + +func (f *Controller) UpdateLatestClusterStatus(ctx context.Context, app *v1beta1.FlinkApplication, clusterStatus v1beta1.FlinkClusterStatus) { + if v1beta1.IsBlueGreenDeploymentMode(app.Spec.DeploymentMode) { + app.Status.VersionStatuses[getCurrentStatusIndex(app)].ClusterStatus = clusterStatus + return + } + app.Status.ClusterStatus = clusterStatus +} + +func (f *Controller) GetLatestJobID(ctx context.Context, application *v1beta1.FlinkApplication) string { + if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + return application.Status.VersionStatuses[getCurrentStatusIndex(application)].JobStatus.JobID + } + return application.Status.JobStatus.JobID +} + +func (f *Controller) UpdateLatestJobID(ctx context.Context, app *v1beta1.FlinkApplication, jobID string) { + if v1beta1.IsBlueGreenDeploymentMode(app.Spec.DeploymentMode) { + app.Status.VersionStatuses[getCurrentStatusIndex(app)].JobStatus.JobID = jobID + } + app.Status.JobStatus.JobID = jobID +} + +func (f *Controller) compareAndUpdateBlueGreenClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) { + isEqual := false + for currIndex := range application.Status.VersionStatuses { + if application.Status.VersionStatuses[currIndex].VersionHash == "" { + continue + } + + oldClusterStatus := application.Status.VersionStatuses[currIndex].ClusterStatus + application.Status.VersionStatuses[currIndex].ClusterStatus.Health = v1beta1.Red + + deployment, err := f.GetCurrentDeploymentsForApp(ctx, application) + if deployment == nil || err != nil { + return false, err + } + + application.Status.VersionStatuses[currIndex].ClusterStatus.ClusterOverviewURL = f.getClusterOverviewURL(application) + application.Status.VersionStatuses[currIndex].ClusterStatus.NumberOfTaskManagers = deployment.Taskmanager.Status.AvailableReplicas + // Get Cluster overview + response, err := f.flinkClient.GetClusterOverview(ctx, f.getURLFromApp(application, hash)) + if err != nil { + return false, err + } + // Update cluster overview + application.Status.VersionStatuses[currIndex].ClusterStatus.AvailableTaskSlots = response.SlotsAvailable + application.Status.VersionStatuses[currIndex].ClusterStatus.NumberOfTaskSlots = response.NumberOfTaskSlots + + // Get Healthy Taskmanagers + tmResponse, tmErr := f.flinkClient.GetTaskManagers(ctx, f.getURLFromApp(application, hash)) + if tmErr != nil { + return false, tmErr + } + application.Status.VersionStatuses[currIndex].ClusterStatus.HealthyTaskManagers = getHealthyTaskManagerCount(tmResponse) + + // Determine Health of the cluster. + // Healthy TaskManagers == Number of taskmanagers --> Green + // Else --> Yellow + if application.Status.VersionStatuses[currIndex].ClusterStatus.HealthyTaskManagers == deployment.Taskmanager.Status.Replicas { + application.Status.VersionStatuses[currIndex].ClusterStatus.Health = v1beta1.Green + } else { + application.Status.VersionStatuses[currIndex].ClusterStatus.Health = v1beta1.Yellow + } + isEqual = isEqual || !apiequality.Semantic.DeepEqual(oldClusterStatus, application.Status.VersionStatuses[currIndex].ClusterStatus) + } + + return isEqual, nil +} + +func (f *Controller) compareAndUpdateBlueGreenJobStatus(ctx context.Context, app *v1beta1.FlinkApplication, hash string) (bool, error) { + isEqual := false + var err error + for statusIndex := range app.Status.VersionStatuses { + if app.Status.VersionStatuses[statusIndex].JobStatus.JobID == "" { + continue + } + + if app.Status.VersionStatuses[statusIndex].JobStatus.LastFailingTime == nil { + initTime := metav1.NewTime(time.Time{}) + app.Status.VersionStatuses[statusIndex].JobStatus.LastFailingTime = &initTime + } + oldJobStatus := app.Status.VersionStatuses[statusIndex].JobStatus + app.Status.VersionStatuses[statusIndex].JobStatus.JobID = oldJobStatus.JobID + jobResponse, err := f.flinkClient.GetJobOverview(ctx, f.getURLFromApp(app, hash), app.Status.VersionStatuses[statusIndex].JobStatus.JobID) + if err != nil { + return false, err + } + checkpoints, err := f.flinkClient.GetCheckpointCounts(ctx, f.getURLFromApp(app, hash), app.Status.VersionStatuses[statusIndex].JobStatus.JobID) + if err != nil { + return false, err + } + + // Job status + app.Status.VersionStatuses[statusIndex].JobStatus.JobOverviewURL = f.getJobOverviewURL(ctx, app) + app.Status.VersionStatuses[statusIndex].JobStatus.State = v1beta1.JobState(jobResponse.State) + jobStartTime := metav1.NewTime(time.Unix(jobResponse.StartTime/1000, 0)) + app.Status.VersionStatuses[statusIndex].JobStatus.StartTime = &jobStartTime + + // Checkpoints status + app.Status.VersionStatuses[statusIndex].JobStatus.FailedCheckpointCount = checkpoints.Counts["failed"] + app.Status.VersionStatuses[statusIndex].JobStatus.CompletedCheckpointCount = checkpoints.Counts["completed"] + app.Status.VersionStatuses[statusIndex].JobStatus.JobRestartCount = checkpoints.Counts["restored"] + + latestCheckpoint := checkpoints.Latest.Completed + var lastCheckpointAgeSeconds int + if latestCheckpoint != nil { + lastCheckpointTimeMillis := metav1.NewTime(time.Unix(latestCheckpoint.LatestAckTimestamp/1000, 0)) + app.Status.VersionStatuses[statusIndex].JobStatus.LastCheckpointTime = &lastCheckpointTimeMillis + app.Status.VersionStatuses[statusIndex].JobStatus.LastCheckpointPath = latestCheckpoint.ExternalPath + lastCheckpointAgeSeconds = app.Status.VersionStatuses[statusIndex].JobStatus.LastCheckpointTime.Second() + } + + if checkpoints.Latest.Restored != nil { + app.Status.VersionStatuses[statusIndex].JobStatus.RestorePath = checkpoints.Latest.Restored.ExternalPath + restoreTime := metav1.NewTime(time.Unix(checkpoints.Latest.Restored.RestoredTimeStamp/1000, 0)) + app.Status.VersionStatuses[statusIndex].JobStatus.RestoreTime = &restoreTime + } + + runningTasks := int32(0) + totalTasks := int32(0) + verticesInCreated := int32(0) + + for _, v := range jobResponse.Vertices { + if v.Status == client.Created { + verticesInCreated++ + } + + for k, v := range v.Tasks { + if k == "RUNNING" { + runningTasks += int32(v) + } + totalTasks += int32(v) + } + } + + app.Status.VersionStatuses[statusIndex].JobStatus.RunningTasks = runningTasks + app.Status.VersionStatuses[statusIndex].JobStatus.TotalTasks = totalTasks + + // Health Status for job + // Job is in FAILING state --> RED + // Time since last successful checkpoint > maxCheckpointTime --> YELLOW + // Else --> Green + + if app.Status.VersionStatuses[statusIndex].JobStatus.State == v1beta1.Failing || + time.Since(app.Status.VersionStatuses[statusIndex].JobStatus.LastFailingTime.Time) < failingIntervalThreshold || + verticesInCreated > 0 { + app.Status.VersionStatuses[statusIndex].JobStatus.Health = v1beta1.Red + } else if time.Since(time.Unix(int64(lastCheckpointAgeSeconds), 0)) < maxCheckpointTime || + runningTasks < totalTasks { + app.Status.VersionStatuses[statusIndex].JobStatus.Health = v1beta1.Yellow + } else { + app.Status.VersionStatuses[statusIndex].JobStatus.Health = v1beta1.Green + } + // Update LastFailingTime + if app.Status.VersionStatuses[statusIndex].JobStatus.State == v1beta1.Failing { + currTime := metav1.Now() + app.Status.VersionStatuses[statusIndex].JobStatus.LastFailingTime = &currTime + } + isEqual = isEqual || !apiequality.Semantic.DeepEqual(oldJobStatus, app.Status.VersionStatuses[statusIndex].JobStatus) + } + return isEqual, err +} diff --git a/pkg/controller/flink/flink_test.go b/pkg/controller/flink/flink_test.go index 52d15127..57e4c575 100644 --- a/pkg/controller/flink/flink_test.go +++ b/pkg/controller/flink/flink_test.go @@ -39,6 +39,7 @@ const testFlinkVersion = "1.7" const testJarName = "test.jar" const testEntryClass = "com.test.MainClass" const testProgramArgs = "--test" +const testVersion = "version" func getTestFlinkController() Controller { testScope := mockScope.NewTestScope() @@ -69,7 +70,6 @@ func getFlinkTestApp() v1beta1.FlinkApplication { app.Status.JobStatus.JobID = testJobID app.Spec.Image = testImage app.Spec.FlinkVersion = testFlinkVersion - return app } @@ -573,7 +573,7 @@ func TestGetJobsForApplicationErr(t *testing.T) { func TestFindExternalizedCheckpoint(t *testing.T) { flinkControllerForTest := getTestFlinkController() flinkApp := getFlinkTestApp() - flinkApp.Status.JobStatus.JobID = "jobid" + flinkControllerForTest.UpdateLatestJobID(context.Background(), &flinkApp, "jobid") mockJmClient := flinkControllerForTest.flinkClient.(*clientMock.JobManagerClient) mockJmClient.GetLatestCheckpointFunc = func(ctx context.Context, url string, jobId string) (*client.CheckpointStatistics, error) { diff --git a/pkg/controller/flink/job_manager_controller.go b/pkg/controller/flink/job_manager_controller.go index 80592dcf..279c0a44 100644 --- a/pkg/controller/flink/job_manager_controller.go +++ b/pkg/controller/flink/job_manager_controller.go @@ -22,6 +22,7 @@ import ( const ( JobManagerNameFormat = "%s-%s-jm" JobManagerPodNameFormat = "%s-%s-jm-pod" + JobManagerVersionPodNameFormat = "%s-%s-jm-%s-pod" JobManagerContainerName = "jobmanager" JobManagerArg = "jobmanager" JobManagerReadinessPath = "/overview" @@ -169,6 +170,10 @@ var JobManagerDefaultResources = coreV1.ResourceRequirements{ func getJobManagerPodName(application *v1beta1.FlinkApplication, hash string) string { applicationName := application.Name + if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + applicationVersion := application.Status.UpdatingVersion + return fmt.Sprintf(JobManagerVersionPodNameFormat, applicationName, hash, applicationVersion) + } return fmt.Sprintf(JobManagerPodNameFormat, applicationName, hash) } diff --git a/pkg/controller/flink/job_manager_controller_test.go b/pkg/controller/flink/job_manager_controller_test.go index 707103a5..fe5376ae 100644 --- a/pkg/controller/flink/job_manager_controller_test.go +++ b/pkg/controller/flink/job_manager_controller_test.go @@ -3,6 +3,8 @@ package flink import ( "testing" + flinkapp "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" + "github.com/lyft/flinkk8soperator/pkg/controller/config" k8mock "github.com/lyft/flinkk8soperator/pkg/controller/k8/mock" @@ -42,6 +44,13 @@ func TestGetJobManagerPodName(t *testing.T) { assert.Equal(t, "app-name-"+testAppHash+"-jm-pod", getJobManagerPodName(&app, testAppHash)) } +func TestGetJobManagerPodNameWithVersion(t *testing.T) { + app := getFlinkTestApp() + app.Spec.DeploymentMode = flinkapp.DeploymentModeBlueGreen + app.Status.UpdatingVersion = testVersion + assert.Equal(t, "app-name-"+testAppHash+"-jm-"+testVersion+"-pod", getJobManagerPodName(&app, testAppHash)) +} + func TestJobManagerCreateSuccess(t *testing.T) { err := initTestConfigForIngress() assert.Nil(t, err) @@ -295,3 +304,82 @@ func TestJobManagerCreateNoIngress(t *testing.T) { assert.Nil(t, err) assert.False(t, newlyCreated) } + +func TestJobManagerCreateSuccessWithVersion(t *testing.T) { + err := initTestConfigForIngress() + assert.Nil(t, err) + testController := getJMControllerForTest() + app := getFlinkTestApp() + app.Spec.JarName = testJarName + app.Spec.EntryClass = testEntryClass + app.Spec.ProgramArgs = testProgramArgs + app.Spec.DeploymentMode = flinkapp.DeploymentModeBlueGreen + app.Status.UpdatingVersion = testVersion + annotations := map[string]string{ + "key": "annotation", + "flink-application-version": testVersion, + "flink-job-properties": "jarName: " + testJarName + "\nparallelism: 8\nentryClass:" + testEntryClass + "\nprogramArgs:\"" + testProgramArgs + "\"", + } + app.Annotations = annotations + hash := "f0bd1679" + expectedLabels := map[string]string{ + "flink-app": "app-name", + "flink-app-hash": hash, + "flink-deployment-type": "jobmanager", + } + ctr := 0 + mockK8Cluster := testController.k8Cluster.(*k8mock.K8Cluster) + mockK8Cluster.CreateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + ctr++ + switch ctr { + case 1: + deployment := object.(*v1.Deployment) + assert.Equal(t, getJobManagerName(&app, hash), deployment.Name) + assert.Equal(t, app.Namespace, deployment.Namespace) + assert.Equal(t, getJobManagerPodName(&app, hash), deployment.Spec.Template.Name) + assert.Equal(t, annotations, deployment.Annotations) + assert.Equal(t, annotations, deployment.Spec.Template.Annotations) + assert.Equal(t, app.Namespace, deployment.Spec.Template.Namespace) + assert.Equal(t, expectedLabels, deployment.Labels) + assert.Equal(t, int32(1), *deployment.Spec.Replicas) + assert.Equal(t, "app-name", deployment.OwnerReferences[0].Name) + assert.Equal(t, "flink.k8s.io/v1beta1", deployment.OwnerReferences[0].APIVersion) + assert.Equal(t, "FlinkApplication", deployment.OwnerReferences[0].Kind) + + assert.Equal(t, "blob.server.port: 6125\njobmanager.heap.size: 1572864k\n"+ + "jobmanager.rpc.port: 6123\n"+ + "jobmanager.web.port: 8081\nmetrics.internal.query-service.port: 50101\n"+ + "query.server.port: 6124\ntaskmanager.heap.size: 524288k\n"+ + "taskmanager.numberOfTaskSlots: 16\n\n"+ + "jobmanager.rpc.address: app-name-"+hash+"\n", + common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env, + "FLINK_PROPERTIES").Value) + assert.Equal(t, testVersion, common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env, + "FLINK_APPLICATION_VERSION").Value) + case 2: + service := object.(*coreV1.Service) + assert.Equal(t, app.Name, service.Name) + assert.Equal(t, app.Namespace, service.Namespace) + assert.Equal(t, map[string]string{"flink-app": "app-name", "flink-app-hash": hash, "flink-deployment-type": "jobmanager"}, service.Spec.Selector) + case 3: + service := object.(*coreV1.Service) + assert.Equal(t, app.Name+"-"+hash, service.Name) + assert.Equal(t, "app-name", service.OwnerReferences[0].Name) + assert.Equal(t, app.Namespace, service.Namespace) + assert.Equal(t, map[string]string{"flink-app": "app-name", "flink-app-hash": hash, "flink-deployment-type": "jobmanager"}, service.Spec.Selector) + case 4: + labels := map[string]string{ + "flink-app": "app-name", + } + ingress := object.(*v1beta1.Ingress) + assert.Equal(t, app.Name, ingress.Name) + assert.Equal(t, app.Namespace, ingress.Namespace) + assert.Equal(t, labels, ingress.Labels) + } + return nil + } + newlyCreated, err := testController.CreateIfNotExist(context.Background(), &app) + assert.Nil(t, err) + assert.True(t, newlyCreated) + assert.Equal(t, 4, ctr) +} diff --git a/pkg/controller/flink/mock/mock_flink.go b/pkg/controller/flink/mock/mock_flink.go index fe05c07f..15b64c91 100644 --- a/pkg/controller/flink/mock/mock_flink.go +++ b/pkg/controller/flink/mock/mock_flink.go @@ -24,6 +24,12 @@ type GetCurrentDeploymentsForAppFunc func(ctx context.Context, application *v1be type FindExternalizedCheckpointFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) type CompareAndUpdateClusterStatusFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) type CompareAndUpdateJobStatusFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) +type GetLatestClusterStatusFunc func(ctx context.Context, app *v1beta1.FlinkApplication) v1beta1.FlinkClusterStatus +type GetLatestJobStatusFunc func(ctx context.Context, app *v1beta1.FlinkApplication) v1beta1.FlinkJobStatus +type GetLatestJobIDFunc func(ctx context.Context, app *v1beta1.FlinkApplication) string +type UpdateLatestJobIDFunc func(ctx context.Context, app *v1beta1.FlinkApplication, jobID string) +type UpdateLatestJobStatusFunc func(ctx context.Context, app *v1beta1.FlinkApplication, jobStatus v1beta1.FlinkJobStatus) +type UpdateLatestClusterStatusFunc func(ctx context.Context, app *v1beta1.FlinkApplication, clusterStatus v1beta1.FlinkClusterStatus) type FlinkController struct { CreateClusterFunc CreateClusterFunc @@ -41,6 +47,12 @@ type FlinkController struct { Events []corev1.Event CompareAndUpdateClusterStatusFunc CompareAndUpdateClusterStatusFunc CompareAndUpdateJobStatusFunc CompareAndUpdateJobStatusFunc + GetLatestClusterStatusFunc GetLatestClusterStatusFunc + GetLatestJobStatusFunc GetLatestJobStatusFunc + GetLatestJobIDFunc GetLatestJobIDFunc + UpdateLatestJobIDFunc UpdateLatestJobIDFunc + UpdateLatestJobStatusFunc UpdateLatestJobStatusFunc + UpdateLatestClusterStatusFunc UpdateLatestClusterStatusFunc } func (m *FlinkController) GetCurrentDeploymentsForApp(ctx context.Context, application *v1beta1.FlinkApplication) (*common.FlinkDeployment, error) { @@ -156,3 +168,75 @@ func (m *FlinkController) CompareAndUpdateJobStatus(ctx context.Context, app *v1 return false, nil } + +func (m *FlinkController) GetLatestClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication) v1beta1.FlinkClusterStatus { + if m.GetLatestClusterStatusFunc != nil { + return m.GetLatestClusterStatusFunc(ctx, application) + } + if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + return application.Status.VersionStatuses[getCurrentStatusIndex(application)].ClusterStatus + } + return application.Status.ClusterStatus +} + +func (m *FlinkController) GetLatestJobStatus(ctx context.Context, application *v1beta1.FlinkApplication) v1beta1.FlinkJobStatus { + if m.GetLatestClusterStatusFunc != nil { + return m.GetLatestJobStatusFunc(ctx, application) + } + if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + return application.Status.VersionStatuses[getCurrentStatusIndex(application)].JobStatus + } + return application.Status.JobStatus +} + +func (m *FlinkController) GetLatestJobID(ctx context.Context, application *v1beta1.FlinkApplication) string { + if m.GetLatestClusterStatusFunc != nil { + return m.GetLatestJobIDFunc(ctx, application) + } + if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + return application.Status.VersionStatuses[getCurrentStatusIndex(application)].JobStatus.JobID + } + return application.Status.JobStatus.JobID +} + +func (m *FlinkController) UpdateLatestJobID(ctx context.Context, application *v1beta1.FlinkApplication, jobID string) { + if m.UpdateLatestJobIDFunc != nil { + m.UpdateLatestJobIDFunc(ctx, application, jobID) + } + if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + application.Status.VersionStatuses[getCurrentStatusIndex(application)].JobStatus.JobID = jobID + return + } + application.Status.JobStatus.JobID = jobID +} + +func (m *FlinkController) UpdateLatestJobStatus(ctx context.Context, application *v1beta1.FlinkApplication, jobStatus v1beta1.FlinkJobStatus) { + if m.UpdateLatestJobStatusFunc != nil { + m.UpdateLatestJobStatusFunc(ctx, application, jobStatus) + } + if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + application.Status.VersionStatuses[getCurrentStatusIndex(application)].JobStatus = jobStatus + return + } + application.Status.JobStatus = jobStatus +} + +func (m *FlinkController) UpdateLatestClusterStatus(ctx context.Context, application *v1beta1.FlinkApplication, clusterStatus v1beta1.FlinkClusterStatus) { + if m.UpdateLatestClusterStatusFunc != nil { + m.UpdateLatestClusterStatusFunc(ctx, application, clusterStatus) + } + if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + application.Status.VersionStatuses[getCurrentStatusIndex(application)].ClusterStatus = clusterStatus + return + } + application.Status.ClusterStatus = clusterStatus +} + +func getCurrentStatusIndex(app *v1beta1.FlinkApplication) int32 { + desiredCount := v1beta1.GetMaxRunningJobs(app.Spec.DeploymentMode) + if v1beta1.IsRunningPhase(app.Status.Phase) { + return 0 + } + + return desiredCount - 1 +} diff --git a/pkg/controller/flink/task_manager_controller.go b/pkg/controller/flink/task_manager_controller.go index 30cbdbf6..3b0d9d30 100644 --- a/pkg/controller/flink/task_manager_controller.go +++ b/pkg/controller/flink/task_manager_controller.go @@ -20,11 +20,12 @@ import ( ) const ( - TaskManagerNameFormat = "%s-%s-tm" - TaskManagerPodNameFormat = "%s-%s-tm-pod" - TaskManagerContainerName = "taskmanager" - TaskManagerArg = "taskmanager" - TaskManagerHostnameEnvVar = "TASKMANAGER_HOSTNAME" + TaskManagerNameFormat = "%s-%s-tm" + TaskManagerPodNameFormat = "%s-%s-tm-pod" + TaskManagerVersionPodNameFormat = "%s-%s-tm-%s-pod" + TaskManagerContainerName = "taskmanager" + TaskManagerArg = "taskmanager" + TaskManagerHostnameEnvVar = "TASKMANAGER_HOSTNAME" ) type TaskManagerControllerInterface interface { @@ -142,6 +143,10 @@ func FetchTaskManagerContainerObj(application *v1beta1.FlinkApplication) *coreV1 func getTaskManagerPodName(application *v1beta1.FlinkApplication, hash string) string { applicationName := application.Name + if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + applicationVersion := application.Status.UpdatingVersion + return fmt.Sprintf(TaskManagerVersionPodNameFormat, applicationName, hash, applicationVersion) + } return fmt.Sprintf(TaskManagerPodNameFormat, applicationName, hash) } diff --git a/pkg/controller/flink/task_manager_controller_test.go b/pkg/controller/flink/task_manager_controller_test.go index 1038d4d3..9f3edca6 100644 --- a/pkg/controller/flink/task_manager_controller_test.go +++ b/pkg/controller/flink/task_manager_controller_test.go @@ -50,6 +50,13 @@ func TestGetTaskManagerPodName(t *testing.T) { assert.Equal(t, "app-name-"+testAppHash+"-tm-pod", getTaskManagerPodName(&app, testAppHash)) } +func TestGetTaskManagerPodNameWithVersion(t *testing.T) { + app := getFlinkTestApp() + app.Spec.DeploymentMode = v1beta1.DeploymentModeBlueGreen + app.Status.UpdatingVersion = testVersion + assert.Equal(t, "app-name-"+testAppHash+"-tm-"+testVersion+"-pod", getTaskManagerPodName(&app, testAppHash)) +} + func TestTaskManagerCreateSuccess(t *testing.T) { testController := getTMControllerForTest() app := getFlinkTestApp() @@ -223,3 +230,55 @@ func TestTaskManagerCreateAlreadyExists(t *testing.T) { assert.Nil(t, err) assert.False(t, newlyCreated) } + +func TestTaskManagerCreateSuccessWithVersion(t *testing.T) { + testController := getTMControllerForTest() + app := getFlinkTestApp() + app.Spec.JarName = testJarName + app.Spec.EntryClass = testEntryClass + app.Spec.ProgramArgs = testProgramArgs + app.Spec.DeploymentMode = v1beta1.DeploymentModeBlueGreen + app.Status.UpdatingVersion = testVersion + annotations := map[string]string{ + "key": "annotation", + "flink-application-version": testVersion, + "flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"", + } + + hash := "f0bd1679" + + app.Annotations = annotations + expectedLabels := map[string]string{ + "flink-app": "app-name", + "flink-app-hash": hash, + "flink-deployment-type": "taskmanager", + } + mockK8Cluster := testController.k8Cluster.(*k8mock.K8Cluster) + mockK8Cluster.CreateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + deployment := object.(*v1.Deployment) + assert.Equal(t, getTaskManagerName(&app, hash), deployment.Name) + assert.Equal(t, app.Namespace, deployment.Namespace) + assert.Equal(t, getTaskManagerPodName(&app, hash), deployment.Spec.Template.Name) + assert.Equal(t, annotations, deployment.Annotations) + assert.Equal(t, annotations, deployment.Spec.Template.Annotations) + assert.Equal(t, app.Namespace, deployment.Spec.Template.Namespace) + assert.Equal(t, expectedLabels, deployment.Labels) + + assert.Equal(t, "blob.server.port: 6125\njobmanager.heap.size: 1572864k\n"+ + "jobmanager.rpc.port: 6123\n"+ + "jobmanager.web.port: 8081\nmetrics.internal.query-service.port: 50101\n"+ + "query.server.port: 6124\ntaskmanager.heap.size: 524288k\n"+ + "taskmanager.numberOfTaskSlots: 16\n\n"+ + "jobmanager.rpc.address: app-name-"+hash+"\n"+ + "taskmanager.host: $HOST_IP\n", + common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env, + "FLINK_PROPERTIES").Value) + assert.Equal(t, testVersion, common.GetEnvVar(deployment.Spec.Template.Spec.Containers[0].Env, + "FLINK_APPLICATION_VERSION").Value) + + return nil + } + newlyCreated, err := testController.CreateIfNotExist(context.Background(), &app) + assert.Nil(t, err) + assert.True(t, newlyCreated) +} diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index 668927eb..8ecd201d 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -153,6 +153,8 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli updateApplication := false updateLastSeenError := false appPhase := application.Status.Phase + // initialize application status array if it's not yet been initialized + s.initializeAppStatusIfEmpty(ctx, application) if !application.ObjectMeta.DeletionTimestamp.IsZero() && appPhase != v1beta1.FlinkApplicationDeleting { s.updateApplicationPhase(application, v1beta1.FlinkApplicationDeleting) @@ -293,6 +295,28 @@ func (s *FlinkStateMachine) handleClusterStarting(ctx context.Context, applicati return statusChanged, nil } +func (s *FlinkStateMachine) initializeAppStatusIfEmpty(ctx context.Context, application *v1beta1.FlinkApplication) { + // initialize the app status array to include 2 status elements in case of blue green deploys + // else use a one element array + if v1beta1.IsBlueGreenDeploymentMode(application.Spec.DeploymentMode) { + application.Status.VersionStatuses = make([]v1beta1.FlinkApplicationVersionStatus, v1beta1.GetMaxRunningJobs(application.Spec.DeploymentMode)) + + // If an application is moving from a Dual to BlueGreen deployment mode, + // We pre-populate the version statuses array with the current Job and Cluster Status + // And reset top-level ClusterStatus and JobStatus to empty structs + // as they'll no longer get updated + if application.Status.JobStatus != (v1beta1.FlinkJobStatus{}) { + s.flinkController.UpdateLatestJobStatus(ctx, application, application.Status.JobStatus) + application.Status.JobStatus = v1beta1.FlinkJobStatus{} + } + + if application.Status.ClusterStatus != (v1beta1.FlinkClusterStatus{}) { + s.flinkController.UpdateLatestClusterStatus(ctx, application, application.Status.ClusterStatus) + application.Status.ClusterStatus = v1beta1.FlinkClusterStatus{} + } + } +} + func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { // we've already savepointed (or this is our first deploy), continue on if application.Status.SavepointPath != "" || application.Status.DeployHash == "" { @@ -317,7 +341,7 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a } s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "CancellingJob", - fmt.Sprintf("Cancelling job %s with a final savepoint", application.Status.JobStatus.JobID)) + fmt.Sprintf("Cancelling job %s with a final savepoint", s.flinkController.GetLatestJobID(ctx, application))) application.Status.SavepointTriggerID = triggerID return statusChanged, nil @@ -335,7 +359,7 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a // TODO: we should probably retry this a few times before failing s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "SavepointFailed", fmt.Sprintf("Failed to take savepoint for job %s: %v", - application.Status.JobStatus.JobID, savepointStatusResponse.Operation.FailureCause)) + s.flinkController.GetLatestJobID(ctx, application), savepointStatusResponse.Operation.FailureCause)) application.Status.RetryCount = 0 s.updateApplicationPhase(application, v1beta1.FlinkApplicationRecovering) return statusChanged, nil @@ -344,7 +368,7 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a fmt.Sprintf("Canceled job with savepoint %s", savepointStatusResponse.Operation.Location)) application.Status.SavepointPath = savepointStatusResponse.Operation.Location - application.Status.JobStatus.JobID = "" + s.flinkController.UpdateLatestJobID(ctx, application, "") s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob) return statusChanged, nil } @@ -421,7 +445,7 @@ func (s *FlinkStateMachine) handleApplicationRecovering(ctx context.Context, app path, flink.HashForApplication(app))) app.Status.SavepointPath = path - app.Status.JobStatus.JobID = "" + s.flinkController.UpdateLatestJobID(ctx, app, "") s.updateApplicationPhase(app, v1beta1.FlinkApplicationSubmittingJob) return statusChanged, nil } @@ -436,8 +460,8 @@ func (s *FlinkStateMachine) submitJobIfNeeded(ctx context.Context, app *v1beta1. } // Check if the job id has already been set on our application - if app.Status.JobStatus.JobID != "" { - return app.Status.JobStatus.JobID, nil + if s.flinkController.GetLatestJobID(ctx, app) != "" { + return s.flinkController.GetLatestJobID(ctx, app), nil } // Check that there are no jobs running before starting the job @@ -503,7 +527,7 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta // Something's gone wrong; roll back s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobSubmissionFailed", fmt.Sprintf("Failed to submit job: %s", reason)) - app.Status.JobStatus.JobID = "" + s.flinkController.UpdateLatestJobID(ctx, app, "") s.updateApplicationPhase(app, v1beta1.FlinkApplicationRollingBackJob) return statusChanged, nil } @@ -521,7 +545,7 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta logger.Errorf(ctx, "Updating cluster status failed with error: %v", clusterErr) } - if app.Status.JobStatus.JobID == "" { + if s.flinkController.GetLatestJobID(ctx, app) == "" { savepointPath := "" if app.Status.DeployHash == "" { // this is the first deploy, use the user-provided savepoint @@ -543,7 +567,7 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta } if appJobID != "" { - app.Status.JobStatus.JobID = appJobID + s.flinkController.UpdateLatestJobID(ctx, app, appJobID) return statusChanged, nil } @@ -557,7 +581,7 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta return statusUnchanged, err } if job == nil { - return statusUnchanged, errors.Errorf("Could not find job %s", app.Status.JobStatus.JobID) + return statusUnchanged, errors.Errorf("Could not find job %s", s.flinkController.GetLatestJobID(ctx, app)) } // wait until all vertices have been scheduled and started @@ -571,12 +595,13 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta app.Status.DeployHash = hash app.Status.SavepointPath = "" app.Status.SavepointTriggerID = "" - app.Status.JobStatus.JarName = app.Spec.JarName - app.Status.JobStatus.Parallelism = app.Spec.Parallelism - app.Status.JobStatus.EntryClass = app.Spec.EntryClass - app.Status.JobStatus.ProgramArgs = app.Spec.ProgramArgs - app.Status.JobStatus.AllowNonRestoredState = app.Spec.AllowNonRestoredState - + jobStatus := s.flinkController.GetLatestJobStatus(ctx, app) + jobStatus.JarName = app.Spec.JarName + jobStatus.Parallelism = app.Spec.Parallelism + jobStatus.EntryClass = app.Spec.EntryClass + jobStatus.ProgramArgs = app.Spec.ProgramArgs + jobStatus.AllowNonRestoredState = app.Spec.AllowNonRestoredState + s.flinkController.UpdateLatestJobStatus(ctx, app, jobStatus) s.updateApplicationPhase(app, v1beta1.FlinkApplicationRunning) return statusChanged, nil } @@ -617,10 +642,11 @@ func (s *FlinkStateMachine) handleRollingBack(ctx context.Context, app *v1beta1. } // submit the old job + jobStatus := s.flinkController.GetLatestJobStatus(ctx, app) jobID, err := s.submitJobIfNeeded(ctx, app, app.Status.DeployHash, - app.Status.JobStatus.JarName, app.Status.JobStatus.Parallelism, - app.Status.JobStatus.EntryClass, app.Status.JobStatus.ProgramArgs, - app.Status.JobStatus.AllowNonRestoredState, + jobStatus.JarName, jobStatus.Parallelism, + jobStatus.EntryClass, jobStatus.ProgramArgs, + jobStatus.AllowNonRestoredState, app.Status.SavepointPath) // set rollbackHash @@ -630,7 +656,7 @@ func (s *FlinkStateMachine) handleRollingBack(ctx context.Context, app *v1beta1. } if jobID != "" { - app.Status.JobStatus.JobID = jobID + s.flinkController.UpdateLatestJobID(ctx, app, jobID) app.Status.SavepointPath = "" app.Status.SavepointTriggerID = "" // move to the deploy failed state @@ -666,7 +692,7 @@ func (s *FlinkStateMachine) handleApplicationRunning(ctx context.Context, applic } if job == nil { - logger.Warnf(ctx, "Could not find active job {}", application.Status.JobStatus.JobID) + logger.Warnf(ctx, "Could not find active job {}", s.flinkController.GetLatestJobID(ctx, application)) } else { logger.Debugf(ctx, "Application running with job %v", job.JobID) } diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index a8534233..fc15c62c 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -453,6 +453,20 @@ func TestSubmittingToRunning(t *testing.T) { return jobID, nil } + mockFlinkController.GetJobsForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) ([]client.FlinkJob, error) { + assert.Equal(t, appHash, hash) + if startCount > 0 { + return []client.FlinkJob{ + { + JobID: jobID, + Status: client.Running, + }, + }, nil + } + return nil, nil + + } + mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) getServiceCount := 0 @@ -498,14 +512,15 @@ func TestSubmittingToRunning(t *testing.T) { mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { if statusUpdateCount == 0 { application := object.(*v1beta1.FlinkApplication) - assert.Equal(t, jobID, application.Status.JobStatus.JobID) + assert.Equal(t, jobID, mockFlinkController.GetLatestJobID(ctx, application)) } else if statusUpdateCount == 1 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, appHash, application.Status.DeployHash) - assert.Equal(t, app.Spec.JarName, app.Status.JobStatus.JarName) - assert.Equal(t, app.Spec.Parallelism, app.Status.JobStatus.Parallelism) - assert.Equal(t, app.Spec.EntryClass, app.Status.JobStatus.EntryClass) - assert.Equal(t, app.Spec.ProgramArgs, app.Status.JobStatus.ProgramArgs) + jobStatus := mockFlinkController.GetLatestJobStatus(ctx, application) + assert.Equal(t, app.Spec.JarName, jobStatus.JarName) + assert.Equal(t, app.Spec.Parallelism, jobStatus.Parallelism) + assert.Equal(t, app.Spec.EntryClass, jobStatus.EntryClass) + assert.Equal(t, app.Spec.ProgramArgs, jobStatus.ProgramArgs) assert.Equal(t, v1beta1.FlinkApplicationRunning, application.Status.Phase) } statusUpdateCount++ @@ -585,11 +600,15 @@ func TestRollingBack(t *testing.T) { Phase: v1beta1.FlinkApplicationRollingBackJob, DeployHash: "old-hash", SavepointPath: "file:///savepoint", - JobStatus: v1beta1.FlinkJobStatus{ - JarName: "old-job.jar", - Parallelism: 10, - EntryClass: "com.my.OldClass", - ProgramArgs: "--no-test", + VersionStatuses: []v1beta1.FlinkApplicationVersionStatus{ + v1beta1.FlinkApplicationVersionStatus{ + JobStatus: v1beta1.FlinkJobStatus{ + JarName: "old-job.jar", + Parallelism: 10, + EntryClass: "com.my.OldClass", + ProgramArgs: "--no-test", + }, + }, }, }, } @@ -608,11 +627,12 @@ func TestRollingBack(t *testing.T) { startCalled = true assert.Equal(t, "old-hash", hash) - assert.Equal(t, app.Status.JobStatus.JarName, jarName) - assert.Equal(t, app.Status.JobStatus.Parallelism, parallelism) - assert.Equal(t, app.Status.JobStatus.EntryClass, entryClass) - assert.Equal(t, app.Status.JobStatus.ProgramArgs, programArgs) - assert.Equal(t, app.Status.JobStatus.AllowNonRestoredState, allowNonRestoredState) + jobStatus := mockFlinkController.GetLatestJobStatus(ctx, application) + assert.Equal(t, jobStatus.JarName, jarName) + assert.Equal(t, jobStatus.Parallelism, parallelism) + assert.Equal(t, jobStatus.EntryClass, entryClass) + assert.Equal(t, jobStatus.ProgramArgs, programArgs) + assert.Equal(t, jobStatus.AllowNonRestoredState, allowNonRestoredState) assert.Equal(t, app.Status.SavepointPath, savepointPath) return jobID, nil } @@ -763,8 +783,12 @@ func TestDeleteWithSavepoint(t *testing.T) { Status: v1beta1.FlinkApplicationStatus{ Phase: v1beta1.FlinkApplicationDeleting, DeployHash: "deployhash", - JobStatus: v1beta1.FlinkJobStatus{ - JobID: jobID, + VersionStatuses: []v1beta1.FlinkApplicationVersionStatus{ + v1beta1.FlinkApplicationVersionStatus{ + JobStatus: v1beta1.FlinkJobStatus{ + JobID: jobID, + }, + }, }, }, } @@ -875,8 +899,12 @@ func TestDeleteWithSavepointAndFinishedJob(t *testing.T) { Phase: v1beta1.FlinkApplicationDeleting, DeployHash: "deployhash", SavepointPath: "file:///savepoint", - JobStatus: v1beta1.FlinkJobStatus{ - JobID: jobID, + VersionStatuses: []v1beta1.FlinkApplicationVersionStatus{ + v1beta1.FlinkApplicationVersionStatus{ + JobStatus: v1beta1.FlinkJobStatus{ + JobID: jobID, + }, + }, }, }, } @@ -922,9 +950,14 @@ func TestDeleteWithForceCancel(t *testing.T) { }, Status: v1beta1.FlinkApplicationStatus{ Phase: v1beta1.FlinkApplicationDeleting, - JobStatus: v1beta1.FlinkJobStatus{ - JobID: jobID, + VersionStatuses: []v1beta1.FlinkApplicationVersionStatus{ + v1beta1.FlinkApplicationVersionStatus{ + JobStatus: v1beta1.FlinkJobStatus{ + JobID: jobID, + }, + }, }, + DeployHash: "deployhash", }, } @@ -1142,7 +1175,7 @@ func TestRollbackWithFailFastError(t *testing.T) { getCount := 0 mockFlinkController.GetJobsForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) ([]client.FlinkJob, error) { var res []client.FlinkJob - if getCount == 1 { + if getCount == 2 { res = []client.FlinkJob{ { JobID: "jid1", @@ -1227,19 +1260,23 @@ func TestRollbackAfterJobSubmission(t *testing.T) { Status: v1beta1.FlinkApplicationStatus{ Phase: v1beta1.FlinkApplicationSubmittingJob, DeployHash: "old-hash-retry-err", - JobStatus: v1beta1.FlinkJobStatus{ - JobID: "jobid", + VersionStatuses: []v1beta1.FlinkApplicationVersionStatus{ + { + JobStatus: v1beta1.FlinkJobStatus{ + JobID: "jobid", + }, + }, }, }, } stateMachineForTest := getTestStateMachine() - + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) err := stateMachineForTest.Handle(context.Background(), &app) assert.Nil(t, err) assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, app.Status.Phase) - assert.Equal(t, "", app.Status.JobStatus.JobID) + assert.Equal(t, "", mockFlinkController.GetLatestJobID(context.Background(), &app)) } func TestErrorHandlingInRunningPhase(t *testing.T) { diff --git a/pkg/controller/k8/cluster.go b/pkg/controller/k8/cluster.go index ff4c0eed..1da90a1c 100644 --- a/pkg/controller/k8/cluster.go +++ b/pkg/controller/k8/cluster.go @@ -60,6 +60,7 @@ func newK8ClusterMetrics(scope promutils.Scope) *k8ClusterMetrics { updateSuccess: labeled.NewCounter("update_success", "K8 object updated successfully", k8ClusterScope), updateFailure: labeled.NewCounter("update_failure", "K8 object update failed", k8ClusterScope), updateConflicts: labeled.NewCounter("update_conflict", "K8 object update failed due to a conflict", k8ClusterScope), + updateInvalidVersion: labeled.NewCounter("update_invalide_version", "K8 object update failed due to an invalid version", k8ClusterScope), deleteSuccess: labeled.NewCounter("delete_success", "K8 object deleted successfully", k8ClusterScope), deleteFailure: labeled.NewCounter("delete_failure", "K8 object deletion failed", k8ClusterScope), getDeploymentCacheHit: labeled.NewCounter("get_deployment_cache_hit", "Deployment fetched from cache", k8ClusterScope), @@ -81,6 +82,7 @@ type k8ClusterMetrics struct { updateSuccess labeled.Counter updateFailure labeled.Counter updateConflicts labeled.Counter + updateInvalidVersion labeled.Counter deleteSuccess labeled.Counter deleteFailure labeled.Counter getDeploymentCacheHit labeled.Counter @@ -199,9 +201,30 @@ func (k *Cluster) UpdateK8Object(ctx context.Context, object runtime.Object) err func (k *Cluster) UpdateStatus(ctx context.Context, object runtime.Object) error { objectCopy := object.DeepCopyObject() - err := k.client.Status().Update(ctx, objectCopy) if err != nil { + if errors.IsInvalid(err) { + // This is a Kubernetes bug that has been fixed in k8s 1.15 + // https://github.com/kubernetes/kubernetes/pull/78713 + // The bug prevents status sub-resources from being updated when + // the stored version of the CRD changes + // Example of error: + // K8s object update failed FlinkApplication.flink.k8s.io "operator-test-app" is invalid: + // apiVersion: Invalid value: "flink.k8s.io/v1beta1": must be flink.k8s.io/v1beta1 + // app_name=operator-test-app ns=default phase=Running src="cluster.go:209" + // This should only ever be encountered once (per application) + // when a new CRD version is deployed and an older version of the application exists + // As a workaround, we try to update the entire resource instead of only the status + // TODO Remove this block when we upgrade to k8s 1.15 + logger.Warn(ctx, "Status sub-resource update failed, attempting to update the entire resource instead") + k.metrics.updateInvalidVersion.Inc(ctx) + updateErr := k.client.Update(ctx, object) + if updateErr != nil { + logger.Errorf(ctx, "K8s object update failed %v", updateErr) + k.metrics.updateFailure.Inc(ctx) + return updateErr + } + } if errors.IsConflict(err) { logger.Warnf(ctx, "Conflict while updating status") k.metrics.updateConflicts.Inc(ctx)