From e70dc3d9c030098e65cfb82ddc853804d0e3739f Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Wed, 23 Nov 2022 15:24:13 +0100 Subject: [PATCH 01/21] Add kafka CRD --- apis/exoscale/v1/kafka_types.go | 112 +++++ apis/exoscale/v1/zz_generated.deepcopy.go | 139 ++++++ apis/exoscale/v1/zz_generated.managed.go | 66 +++ apis/exoscale/v1/zz_generated.managedlist.go | 9 + .../crds/exoscale.crossplane.io_kafkas.yaml | 394 ++++++++++++++++++ package/webhook/manifests.yaml | 20 + 6 files changed, 740 insertions(+) create mode 100644 apis/exoscale/v1/kafka_types.go create mode 100644 package/crds/exoscale.crossplane.io_kafkas.yaml diff --git a/apis/exoscale/v1/kafka_types.go b/apis/exoscale/v1/kafka_types.go new file mode 100644 index 00000000..5f6e08d2 --- /dev/null +++ b/apis/exoscale/v1/kafka_types.go @@ -0,0 +1,112 @@ +package v1 + +import ( + "reflect" + + xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" + "github.com/crossplane/crossplane-runtime/pkg/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// KafkaParameters are the configurable fields of a Kafka instance. +type KafkaParameters struct { + Maintenance MaintenanceSpec `json:"maintenance,omitempty"` + + // +kubebuilder:validation:Required + + // Zone is the datacenter identifier in which the instance runs in. + Zone Zone `json:"zone"` + + DBaaSParameters `json:",inline"` + + // KafkaSettings contains additional Kafka settings. + KafkaSettings runtime.RawExtension `json:"kafkaSettings,omitempty"` +} + +// KafkaSpec defines the desired state of a Kafka. +type KafkaSpec struct { + xpv1.ResourceSpec `json:",inline"` + ForProvider KafkaParameters `json:"forProvider"` +} + +// KafkaObservation are the observable fields of a Kafka. +type KafkaObservation struct { + Version string `json:"version,omitempty"` + // KafkaSettings contains additional Kafka settings as set by the provider. + KafkaSettings runtime.RawExtension `json:"kafkaSettings,omitempty"` + + // State of individual service nodes + NodeStates []NodeState `json:"nodeStates,omitempty"` + + // Service notifications + Notifications []Notification `json:"notifications,omitempty"` +} + +// KafkaStatus represents the observed state of a Kafka instance. +type KafkaStatus struct { + xpv1.ResourceStatus `json:",inline"` + AtProvider KafkaObservation `json:"atProvider,omitempty"` +} + +// +kubebuilder:object:root=true +// +kubebuilder:printcolumn:name="State",type="string",JSONPath=".status.conditions[?(@.type=='Ready')].reason" +// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type=='Ready')].status" +// +kubebuilder:printcolumn:name="Synced",type="string",JSONPath=".status.conditions[?(@.type=='Synced')].status" +// +kubebuilder:printcolumn:name="External Name",type="string",JSONPath=".metadata.annotations.crossplane\\.io/external-name" +// +kubebuilder:printcolumn:name="Plan",type="string",JSONPath=".spec.forProvider.size.plan" +// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" +// +kubebuilder:subresource:status +// +kubebuilder:resource:scope=Cluster,categories={crossplane,exoscale} +// +kubebuilder:webhook:verbs=create;update,path=/validate-exoscale-crossplane-io-v1-kafka,mutating=false,failurePolicy=fail,groups=exoscale.crossplane.io,resources=kafka,versions=v1,name=kafka.exoscale.crossplane.io,sideEffects=None,admissionReviewVersions=v1 + +// Kafka is the API for creating Kafka. +type Kafka struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec KafkaSpec `json:"spec"` + Status KafkaStatus `json:"status,omitempty"` +} + +// GetProviderConfigName returns the name of the ProviderConfig. +// Returns empty string if reference not given. +func (in *Kafka) GetProviderConfigName() string { + if ref := in.GetProviderConfigReference(); ref != nil { + return ref.Name + } + return "" +} + +// GetInstanceName returns the external name of the instance in the following precedence: +// +// .metadata.annotations."crossplane.io/external-name" +// .metadata.name +func (in *Kafka) GetInstanceName() string { + if name := meta.GetExternalName(in); name != "" { + return name + } + return in.Name +} + +// +kubebuilder:object:root=true + +// KafkaList contains a list of Kafka +type KafkaList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []Kafka `json:"items"` +} + +// Kafka type metadata. +var ( + KafkaKind = reflect.TypeOf(Kafka{}).Name() + KafkaGroupKind = schema.GroupKind{Group: Group, Kind: KafkaKind}.String() + KafkaKindAPIVersion = KafkaKind + "." + SchemeGroupVersion.String() + KafkaGroupVersionKind = SchemeGroupVersion.WithKind(KafkaKind) +) + +func init() { + SchemeBuilder.Register(&Kafka{}, &KafkaList{}) +} diff --git a/apis/exoscale/v1/zz_generated.deepcopy.go b/apis/exoscale/v1/zz_generated.deepcopy.go index a5f099b3..5f735225 100644 --- a/apis/exoscale/v1/zz_generated.deepcopy.go +++ b/apis/exoscale/v1/zz_generated.deepcopy.go @@ -312,6 +312,145 @@ func (in IPFilter) DeepCopy() IPFilter { return *out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Kafka) DeepCopyInto(out *Kafka) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Kafka. +func (in *Kafka) DeepCopy() *Kafka { + if in == nil { + return nil + } + out := new(Kafka) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Kafka) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaList) DeepCopyInto(out *KafkaList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Kafka, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaList. +func (in *KafkaList) DeepCopy() *KafkaList { + if in == nil { + return nil + } + out := new(KafkaList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *KafkaList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaObservation) DeepCopyInto(out *KafkaObservation) { + *out = *in + in.KafkaSettings.DeepCopyInto(&out.KafkaSettings) + if in.NodeStates != nil { + in, out := &in.NodeStates, &out.NodeStates + *out = make([]NodeState, len(*in)) + copy(*out, *in) + } + if in.Notifications != nil { + in, out := &in.Notifications, &out.Notifications + *out = make([]Notification, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaObservation. +func (in *KafkaObservation) DeepCopy() *KafkaObservation { + if in == nil { + return nil + } + out := new(KafkaObservation) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaParameters) DeepCopyInto(out *KafkaParameters) { + *out = *in + out.Maintenance = in.Maintenance + in.DBaaSParameters.DeepCopyInto(&out.DBaaSParameters) + in.KafkaSettings.DeepCopyInto(&out.KafkaSettings) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaParameters. +func (in *KafkaParameters) DeepCopy() *KafkaParameters { + if in == nil { + return nil + } + out := new(KafkaParameters) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaSpec) DeepCopyInto(out *KafkaSpec) { + *out = *in + in.ResourceSpec.DeepCopyInto(&out.ResourceSpec) + in.ForProvider.DeepCopyInto(&out.ForProvider) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaSpec. +func (in *KafkaSpec) DeepCopy() *KafkaSpec { + if in == nil { + return nil + } + out := new(KafkaSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaStatus) DeepCopyInto(out *KafkaStatus) { + *out = *in + in.ResourceStatus.DeepCopyInto(&out.ResourceStatus) + in.AtProvider.DeepCopyInto(&out.AtProvider) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaStatus. +func (in *KafkaStatus) DeepCopy() *KafkaStatus { + if in == nil { + return nil + } + out := new(KafkaStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MaintenanceSpec) DeepCopyInto(out *MaintenanceSpec) { *out = *in diff --git a/apis/exoscale/v1/zz_generated.managed.go b/apis/exoscale/v1/zz_generated.managed.go index c85ef64f..fc8345b6 100644 --- a/apis/exoscale/v1/zz_generated.managed.go +++ b/apis/exoscale/v1/zz_generated.managed.go @@ -136,6 +136,72 @@ func (mg *IAMKey) SetWriteConnectionSecretToReference(r *xpv1.SecretReference) { mg.Spec.WriteConnectionSecretToReference = r } +// GetCondition of this Kafka. +func (mg *Kafka) GetCondition(ct xpv1.ConditionType) xpv1.Condition { + return mg.Status.GetCondition(ct) +} + +// GetDeletionPolicy of this Kafka. +func (mg *Kafka) GetDeletionPolicy() xpv1.DeletionPolicy { + return mg.Spec.DeletionPolicy +} + +// GetProviderConfigReference of this Kafka. +func (mg *Kafka) GetProviderConfigReference() *xpv1.Reference { + return mg.Spec.ProviderConfigReference +} + +/* +GetProviderReference of this Kafka. +Deprecated: Use GetProviderConfigReference. +*/ +func (mg *Kafka) GetProviderReference() *xpv1.Reference { + return mg.Spec.ProviderReference +} + +// GetPublishConnectionDetailsTo of this Kafka. +func (mg *Kafka) GetPublishConnectionDetailsTo() *xpv1.PublishConnectionDetailsTo { + return mg.Spec.PublishConnectionDetailsTo +} + +// GetWriteConnectionSecretToReference of this Kafka. +func (mg *Kafka) GetWriteConnectionSecretToReference() *xpv1.SecretReference { + return mg.Spec.WriteConnectionSecretToReference +} + +// SetConditions of this Kafka. +func (mg *Kafka) SetConditions(c ...xpv1.Condition) { + mg.Status.SetConditions(c...) +} + +// SetDeletionPolicy of this Kafka. +func (mg *Kafka) SetDeletionPolicy(r xpv1.DeletionPolicy) { + mg.Spec.DeletionPolicy = r +} + +// SetProviderConfigReference of this Kafka. +func (mg *Kafka) SetProviderConfigReference(r *xpv1.Reference) { + mg.Spec.ProviderConfigReference = r +} + +/* +SetProviderReference of this Kafka. +Deprecated: Use SetProviderConfigReference. +*/ +func (mg *Kafka) SetProviderReference(r *xpv1.Reference) { + mg.Spec.ProviderReference = r +} + +// SetPublishConnectionDetailsTo of this Kafka. +func (mg *Kafka) SetPublishConnectionDetailsTo(r *xpv1.PublishConnectionDetailsTo) { + mg.Spec.PublishConnectionDetailsTo = r +} + +// SetWriteConnectionSecretToReference of this Kafka. +func (mg *Kafka) SetWriteConnectionSecretToReference(r *xpv1.SecretReference) { + mg.Spec.WriteConnectionSecretToReference = r +} + // GetCondition of this MySQL. func (mg *MySQL) GetCondition(ct xpv1.ConditionType) xpv1.Condition { return mg.Status.GetCondition(ct) diff --git a/apis/exoscale/v1/zz_generated.managedlist.go b/apis/exoscale/v1/zz_generated.managedlist.go index 76511616..24b1381e 100644 --- a/apis/exoscale/v1/zz_generated.managedlist.go +++ b/apis/exoscale/v1/zz_generated.managedlist.go @@ -22,6 +22,15 @@ func (l *IAMKeyList) GetItems() []resource.Managed { return items } +// GetItems of this KafkaList. +func (l *KafkaList) GetItems() []resource.Managed { + items := make([]resource.Managed, len(l.Items)) + for i := range l.Items { + items[i] = &l.Items[i] + } + return items +} + // GetItems of this MySQLList. func (l *MySQLList) GetItems() []resource.Managed { items := make([]resource.Managed, len(l.Items)) diff --git a/package/crds/exoscale.crossplane.io_kafkas.yaml b/package/crds/exoscale.crossplane.io_kafkas.yaml new file mode 100644 index 00000000..5155ea64 --- /dev/null +++ b/package/crds/exoscale.crossplane.io_kafkas.yaml @@ -0,0 +1,394 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.10.0 + creationTimestamp: null + name: kafkas.exoscale.crossplane.io +spec: + group: exoscale.crossplane.io + names: + categories: + - crossplane + - exoscale + kind: Kafka + listKind: KafkaList + plural: kafkas + singular: kafka + scope: Cluster + versions: + - additionalPrinterColumns: + - jsonPath: .status.conditions[?(@.type=='Ready')].reason + name: State + type: string + - jsonPath: .status.conditions[?(@.type=='Ready')].status + name: Ready + type: string + - jsonPath: .status.conditions[?(@.type=='Synced')].status + name: Synced + type: string + - jsonPath: .metadata.annotations.crossplane\.io/external-name + name: External Name + type: string + - jsonPath: .spec.forProvider.size.plan + name: Plan + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1 + schema: + openAPIV3Schema: + description: Kafka is the API for creating Kafka. + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: KafkaSpec defines the desired state of a Kafka. + properties: + deletionPolicy: + default: Delete + description: DeletionPolicy specifies what will happen to the underlying + external when this managed resource is deleted - either "Delete" + or "Orphan" the external resource. + enum: + - Orphan + - Delete + type: string + forProvider: + description: KafkaParameters are the configurable fields of a Kafka + instance. + properties: + ipFilter: + description: IPFilter is a list of allowed IPv4 CIDR ranges that + can access the service. If no IP Filter is set, you may not + be able to reach the service. A value of `0.0.0.0/0` will open + the service to all addresses on the public internet. + items: + type: string + type: array + kafkaSettings: + description: KafkaSettings contains additional Kafka settings. + type: object + x-kubernetes-preserve-unknown-fields: true + maintenance: + description: MaintenanceSpec contains settings to control the + maintenance of an instance. + properties: + dayOfWeek: + description: DayOfWeek specifies at which weekday the maintenance + is held place. Allowed values are [monday, tuesday, wednesday, + thursday, friday, saturday, sunday, never] + enum: + - monday + - tuesday + - wednesday + - thursday + - friday + - saturday + - sunday + - never + type: string + timeOfDay: + description: 'TimeOfDay for installing updates in UTC. Format: + "hh:mm:ss".' + pattern: ^([0-1]?[0-9]|2[0-3]):([0-5][0-9]):([0-5][0-9])$ + type: string + type: object + size: + description: Size contains the service capacity settings. + properties: + plan: + type: string + type: object + terminationProtection: + description: TerminationProtection protects against termination + and powering off. + type: boolean + zone: + description: Zone is the datacenter identifier in which the instance + runs in. + enum: + - ch-gva-2 + - ch-dk-2 + - de-fra-1 + - de-muc-1 + - at-vie-1 + - bg-sof-1 + type: string + required: + - zone + type: object + providerConfigRef: + default: + name: default + description: ProviderConfigReference specifies how the provider that + will be used to create, observe, update, and delete this managed + resource should be configured. + properties: + name: + description: Name of the referenced object. + type: string + policy: + description: Policies for referencing. + properties: + resolution: + default: Required + description: Resolution specifies whether resolution of this + reference is required. The default is 'Required', which + means the reconcile will fail if the reference cannot be + resolved. 'Optional' means this reference will be a no-op + if it cannot be resolved. + enum: + - Required + - Optional + type: string + resolve: + description: Resolve specifies when this reference should + be resolved. The default is 'IfNotPresent', which will attempt + to resolve the reference only when the corresponding field + is not present. Use 'Always' to resolve the reference on + every reconcile. + enum: + - Always + - IfNotPresent + type: string + type: object + required: + - name + type: object + providerRef: + description: 'ProviderReference specifies the provider that will be + used to create, observe, update, and delete this managed resource. + Deprecated: Please use ProviderConfigReference, i.e. `providerConfigRef`' + properties: + name: + description: Name of the referenced object. + type: string + policy: + description: Policies for referencing. + properties: + resolution: + default: Required + description: Resolution specifies whether resolution of this + reference is required. The default is 'Required', which + means the reconcile will fail if the reference cannot be + resolved. 'Optional' means this reference will be a no-op + if it cannot be resolved. + enum: + - Required + - Optional + type: string + resolve: + description: Resolve specifies when this reference should + be resolved. The default is 'IfNotPresent', which will attempt + to resolve the reference only when the corresponding field + is not present. Use 'Always' to resolve the reference on + every reconcile. + enum: + - Always + - IfNotPresent + type: string + type: object + required: + - name + type: object + publishConnectionDetailsTo: + description: PublishConnectionDetailsTo specifies the connection secret + config which contains a name, metadata and a reference to secret + store config to which any connection details for this managed resource + should be written. Connection details frequently include the endpoint, + username, and password required to connect to the managed resource. + properties: + configRef: + default: + name: default + description: SecretStoreConfigRef specifies which secret store + config should be used for this ConnectionSecret. + properties: + name: + description: Name of the referenced object. + type: string + policy: + description: Policies for referencing. + properties: + resolution: + default: Required + description: Resolution specifies whether resolution of + this reference is required. The default is 'Required', + which means the reconcile will fail if the reference + cannot be resolved. 'Optional' means this reference + will be a no-op if it cannot be resolved. + enum: + - Required + - Optional + type: string + resolve: + description: Resolve specifies when this reference should + be resolved. The default is 'IfNotPresent', which will + attempt to resolve the reference only when the corresponding + field is not present. Use 'Always' to resolve the reference + on every reconcile. + enum: + - Always + - IfNotPresent + type: string + type: object + required: + - name + type: object + metadata: + description: Metadata is the metadata for connection secret. + properties: + annotations: + additionalProperties: + type: string + description: Annotations are the annotations to be added to + connection secret. - For Kubernetes secrets, this will be + used as "metadata.annotations". - It is up to Secret Store + implementation for others store types. + type: object + labels: + additionalProperties: + type: string + description: Labels are the labels/tags to be added to connection + secret. - For Kubernetes secrets, this will be used as "metadata.labels". + - It is up to Secret Store implementation for others store + types. + type: object + type: + description: Type is the SecretType for the connection secret. + - Only valid for Kubernetes Secret Stores. + type: string + type: object + name: + description: Name is the name of the connection secret. + type: string + required: + - name + type: object + writeConnectionSecretToRef: + description: WriteConnectionSecretToReference specifies the namespace + and name of a Secret to which any connection details for this managed + resource should be written. Connection details frequently include + the endpoint, username, and password required to connect to the + managed resource. This field is planned to be replaced in a future + release in favor of PublishConnectionDetailsTo. Currently, both + could be set independently and connection details would be published + to both without affecting each other. + properties: + name: + description: Name of the secret. + type: string + namespace: + description: Namespace of the secret. + type: string + required: + - name + - namespace + type: object + required: + - forProvider + type: object + status: + description: KafkaStatus represents the observed state of a Kafka instance. + properties: + atProvider: + description: KafkaObservation are the observable fields of a Kafka. + properties: + kafkaSettings: + description: KafkaSettings contains additional Kafka settings + as set by the provider. + type: object + x-kubernetes-preserve-unknown-fields: true + nodeStates: + description: State of individual service nodes + items: + description: NodeState describes the state of a service node. + properties: + name: + description: Name of the service node + type: string + role: + description: Role of this node. + type: string + state: + description: State of the service node. + type: string + type: object + type: array + notifications: + description: Service notifications + items: + description: Notification contains a service message. + properties: + level: + description: Level of the notification. + type: string + message: + description: Message contains the notification. + type: string + metadata: + description: Metadata contains additional data. + type: object + x-kubernetes-preserve-unknown-fields: true + type: + description: Type of the notification. + type: string + type: object + type: array + version: + type: string + type: object + conditions: + description: Conditions of the resource. + items: + description: A Condition that may apply to a resource. + properties: + lastTransitionTime: + description: LastTransitionTime is the last time this condition + transitioned from one status to another. + format: date-time + type: string + message: + description: A Message containing details about this condition's + last transition from one status to another, if any. + type: string + reason: + description: A Reason for this condition's last transition from + one status to another. + type: string + status: + description: Status of this condition; is it currently True, + False, or Unknown? + type: string + type: + description: Type of this condition. At most one of each condition + type may apply to a resource at any point in time. + type: string + required: + - lastTransitionTime + - reason + - status + - type + type: object + type: array + type: object + required: + - spec + type: object + served: true + storage: true + subresources: + status: {} diff --git a/package/webhook/manifests.yaml b/package/webhook/manifests.yaml index f254e4c8..61c6a91a 100644 --- a/package/webhook/manifests.yaml +++ b/package/webhook/manifests.yaml @@ -45,6 +45,26 @@ webhooks: resources: - iamkeys sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /validate-exoscale-crossplane-io-v1-kafka + failurePolicy: Fail + name: kafka.exoscale.crossplane.io + rules: + - apiGroups: + - exoscale.crossplane.io + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - kafka + sideEffects: None - admissionReviewVersions: - v1 clientConfig: From f85262691aecef99844b0d2132b7e7075fcaa944 Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Thu, 24 Nov 2022 15:26:15 +0100 Subject: [PATCH 02/21] Add Kafka Operator --- operator/kafkacontroller/connection.go | 174 ++++++++++++++++++++++++ operator/kafkacontroller/observe.go | 176 +++++++++++++++++++++++++ operator/operator.go | 2 + 3 files changed, 352 insertions(+) create mode 100644 operator/kafkacontroller/connection.go create mode 100644 operator/kafkacontroller/observe.go diff --git a/operator/kafkacontroller/connection.go b/operator/kafkacontroller/connection.go new file mode 100644 index 00000000..2e644644 --- /dev/null +++ b/operator/kafkacontroller/connection.go @@ -0,0 +1,174 @@ +package kafkacontroller + +import ( + "context" + "fmt" + "strings" + "time" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/mapper" + "github.com/vshn/provider-exoscale/operator/pipelineutil" + + "github.com/crossplane/crossplane-runtime/pkg/event" + "github.com/crossplane/crossplane-runtime/pkg/logging" + "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" + "github.com/crossplane/crossplane-runtime/pkg/resource" + exoscalesdk "github.com/exoscale/egoscale/v2" + "github.com/exoscale/egoscale/v2/oapi" + controllerruntime "sigs.k8s.io/controller-runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type connector struct { + kube client.Client + recorder event.Recorder +} + +type connection struct { + kube client.Client + recorder event.Recorder + + exo *exoscalesdk.Client +} + +// Connect implements managed.ExternalConnecter. +func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.ExternalClient, error) { + log := ctrl.LoggerFrom(ctx) + log.V(1).Info("connecting resource") + + kafkaInstance, ok := mg.(*exoscalev1.Kafka) + if !ok { + return nil, fmt.Errorf("invalid managed resource type %T for kafka connector", mg) + } + + exo, err := pipelineutil.OpenExoscaleClient(ctx, c.kube, kafkaInstance.GetProviderConfigName(), exoscalesdk.ClientOptWithAPIEndpoint(fmt.Sprintf("https://api-%s.exoscale.com", kafkaInstance.Spec.ForProvider.Zone))) + if err != nil { + return nil, err + } + return connection{ + kube: c.kube, + recorder: c.recorder, + exo: exo.Exoscale, + }, nil +} + +// SetupController adds a controller that reconciles managed resources. +func SetupController(mgr ctrl.Manager) error { + name := strings.ToLower(exoscalev1.KafkaGroupKind) + recorder := event.NewAPIRecorder(mgr.GetEventRecorderFor(name)) + + return SetupControllerWithConnecter(mgr, name, recorder, &connector{ + kube: mgr.GetClient(), + recorder: recorder, + }, 30*time.Second) +} + +func SetupControllerWithConnecter(mgr ctrl.Manager, name string, recorder event.Recorder, c managed.ExternalConnecter, creationGracePeriod time.Duration) error { + r := createReconciler(mgr, name, recorder, c, creationGracePeriod) + + return ctrl.NewControllerManagedBy(mgr). + Named(name). + For(&exoscalev1.Kafka{}). + Complete(r) +} + +func createReconciler(mgr ctrl.Manager, name string, recorder event.Recorder, c managed.ExternalConnecter, creationGracePeriod time.Duration) *managed.Reconciler { + cps := []managed.ConnectionPublisher{managed.NewAPISecretPublisher(mgr.GetClient(), mgr.GetScheme())} + + return managed.NewReconciler(mgr, + resource.ManagedKind(exoscalev1.KafkaGroupVersionKind), + managed.WithExternalConnecter(c), + managed.WithLogger(logging.NewLogrLogger(mgr.GetLogger().WithValues("controller", name))), + managed.WithRecorder(recorder), + managed.WithPollInterval(1*time.Minute), + managed.WithConnectionPublishers(cps...), + managed.WithCreationGracePeriod(creationGracePeriod)) +} + +// Create implements managed.ExternalClient +func (c connection) Create(ctx context.Context, mg resource.Managed) (managed.ExternalCreation, error) { + log := controllerruntime.LoggerFrom(ctx) + log.V(1).Info("creating resource") + + instance := mg.(*exoscalev1.Kafka) + + spec := instance.Spec.ForProvider + ipFilter := []string(spec.IPFilter) + settings, err := mapper.ToMap(spec.KafkaSettings) + if err != nil { + return managed.ExternalCreation{}, fmt.Errorf("invalid kafka settings: %w", err) + } + + body := oapi.CreateDbaasServiceKafkaJSONRequestBody{ + IpFilter: &ipFilter, + KafkaSettings: &settings, + Maintenance: &struct { + Dow oapi.CreateDbaasServiceKafkaJSONBodyMaintenanceDow "json:\"dow\"" + Time string "json:\"time\"" + }{ + Dow: oapi.CreateDbaasServiceKafkaJSONBodyMaintenanceDow(spec.Maintenance.DayOfWeek), + Time: spec.Maintenance.TimeOfDay.String(), + }, + Plan: spec.Size.Plan, + TerminationProtection: &spec.TerminationProtection, + } + + resp, err := c.exo.CreateDbaasServiceKafkaWithResponse(ctx, oapi.DbaasServiceName(instance.GetInstanceName()), body) + if err != nil { + return managed.ExternalCreation{}, fmt.Errorf("unable to create instance: %w", err) + } + log.V(2).Info("response", "body", string(resp.Body)) + return managed.ExternalCreation{}, nil +} + +// Delete implements managed.ExternalClient +func (c connection) Delete(ctx context.Context, mg resource.Managed) error { + log := controllerruntime.LoggerFrom(ctx) + log.V(1).Info("deleting resource") + + instance := mg.(*exoscalev1.Kafka) + resp, err := c.exo.DeleteDbaasServiceWithResponse(ctx, instance.GetInstanceName()) + if err != nil { + return fmt.Errorf("cannot delete kafak instance: %w", err) + } + log.V(2).Info("response", "body", string(resp.Body)) + return nil +} + +// Update implements managed.ExternalClient +func (c connection) Update(ctx context.Context, mg resource.Managed) (managed.ExternalUpdate, error) { + log := controllerruntime.LoggerFrom(ctx) + log.V(1).Info("updating resource") + + instance := mg.(*exoscalev1.Kafka) + + spec := instance.Spec.ForProvider + ipFilter := []string(spec.IPFilter) + settings, err := mapper.ToMap(spec.KafkaSettings) + if err != nil { + return managed.ExternalUpdate{}, fmt.Errorf("invalid kafka settings: %w", err) + } + + body := oapi.UpdateDbaasServiceKafkaJSONRequestBody{ + IpFilter: &ipFilter, + KafkaSettings: &settings, + Maintenance: &struct { + Dow oapi.UpdateDbaasServiceKafkaJSONBodyMaintenanceDow "json:\"dow\"" + Time string "json:\"time\"" + }{ + Dow: oapi.UpdateDbaasServiceKafkaJSONBodyMaintenanceDow(spec.Maintenance.DayOfWeek), + Time: spec.Maintenance.TimeOfDay.String(), + }, + Plan: &spec.Size.Plan, + TerminationProtection: &spec.TerminationProtection, + } + + resp, err := c.exo.UpdateDbaasServiceKafkaWithResponse(ctx, oapi.DbaasServiceName(instance.GetInstanceName()), body) + if err != nil { + return managed.ExternalUpdate{}, fmt.Errorf("unable to update instance: %w", err) + } + log.V(2).Info("response", "body", string(resp.Body)) + return managed.ExternalUpdate{}, nil +} diff --git a/operator/kafkacontroller/observe.go b/operator/kafkacontroller/observe.go new file mode 100644 index 00000000..7a888b3a --- /dev/null +++ b/operator/kafkacontroller/observe.go @@ -0,0 +1,176 @@ +package kafkacontroller + +import ( + "context" + "errors" + "fmt" + "strings" + + xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" + "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" + "github.com/crossplane/crossplane-runtime/pkg/resource" + exoscaleapi "github.com/exoscale/egoscale/v2/api" + "github.com/exoscale/egoscale/v2/oapi" + "github.com/google/go-cmp/cmp" + "k8s.io/utils/pointer" + controllerruntime "sigs.k8s.io/controller-runtime" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/mapper" +) + +// Observe implements managed.ExternalClient +func (c connection) Observe(ctx context.Context, mg resource.Managed) (managed.ExternalObservation, error) { + log := controllerruntime.LoggerFrom(ctx) + log.V(1).Info("observing resource") + + instance, ok := mg.(*exoscalev1.Kafka) + if !ok { + return managed.ExternalObservation{}, fmt.Errorf("invalid managed resource type %T for kafka connection", mg) + } + + res, err := c.exo.GetDbaasServiceKafkaWithResponse(ctx, oapi.DbaasServiceName(instance.GetInstanceName())) + if err != nil { + if errors.Is(err, exoscaleapi.ErrNotFound) { + return managed.ExternalObservation{ResourceExists: false}, nil + } + return managed.ExternalObservation{}, err + } + external := res.JSON200 + + instance.Status.AtProvider, err = getObservation(external) + if err != nil { + log.Error(err, "failed to observe kafka instance") + } + + condition, err := getCondition(external) + if err != nil { + log.Error(err, "failed to update kafka condition") + } + instance.SetConditions(condition) + + connDetails, err := getConnectionDetails(external) + if err != nil { + return managed.ExternalObservation{}, fmt.Errorf("failed to get kafka connection details: %w", err) + } + + upToDate, diff := diffParamters(external, instance.Spec.ForProvider) + + return managed.ExternalObservation{ + ResourceExists: true, + ResourceUpToDate: upToDate, + ResourceLateInitialized: false, + ConnectionDetails: connDetails, + Diff: diff, + }, nil +} + +func getObservation(external *oapi.DbaasServiceKafka) (exoscalev1.KafkaObservation, error) { + notifications, err := mapper.ToNotifications(external.Notifications) + if err != nil { + return exoscalev1.KafkaObservation{}, err + } + settings, err := mapper.ToRawExtension(external.KafkaSettings) + if err != nil { + return exoscalev1.KafkaObservation{}, err + } + + nodeStates := []exoscalev1.NodeState{} + if external.NodeStates != nil { + nodeStates = mapper.ToNodeStates(external.NodeStates) + } + + return exoscalev1.KafkaObservation{ + Version: pointer.StringDeref(external.Version, ""), + KafkaSettings: settings, + NodeStates: nodeStates, + Notifications: notifications, + }, nil +} +func getCondition(external *oapi.DbaasServiceKafka) (xpv1.Condition, error) { + var state oapi.EnumServiceState + if external.State != nil { + state = *external.State + } + switch state { + case oapi.EnumServiceStateRunning: + return exoscalev1.Running(), nil + case oapi.EnumServiceStateRebuilding: + return exoscalev1.Rebuilding(), nil + case oapi.EnumServiceStatePoweroff: + return exoscalev1.PoweredOff(), nil + case oapi.EnumServiceStateRebalancing: + return exoscalev1.Rebalancing(), nil + default: + return xpv1.Condition{}, fmt.Errorf("unknown state %q", state) + } +} +func getConnectionDetails(external *oapi.DbaasServiceKafka) (map[string][]byte, error) { + if external.ConnectionInfo == nil { + return nil, errors.New("no connection details") + } + nodes := "" + if external.ConnectionInfo.Nodes != nil { + nodes = strings.Join(*external.ConnectionInfo.Nodes, " ") + } + + if external.ConnectionInfo.AccessCert == nil { + return nil, errors.New("no certificate returned") + } + cert := *external.ConnectionInfo.AccessCert + + if external.ConnectionInfo.AccessKey == nil { + return nil, errors.New("no key returned") + } + key := *external.ConnectionInfo.AccessKey + + if external.Uri == nil { + return nil, errors.New("no URI returned") + } + uri := *external.Uri + host := "" + port := "" + if external.UriParams != nil { + uriParams := *external.UriParams + host, _ = uriParams["host"].(string) + port, _ = uriParams["port"].(string) + } + + return map[string][]byte{ + "KAFKA_URI": []byte(uri), + "KAFKA_HOST": []byte(host), + "KAFKA_PORT": []byte(port), + "KAFKA_NODES": []byte(nodes), + "cert.pem": []byte(cert), + "key.pem": []byte(key), + }, nil +} + +func diffParamters(external *oapi.DbaasServiceKafka, expected exoscalev1.KafkaParameters) (bool, string) { + actualIPFilter := []string{} + if external.IpFilter != nil { + actualIPFilter = *external.IpFilter + } + actualKafkaSettings, err := mapper.ToRawExtension(external.KafkaRestSettings) + if err != nil { + return false, err.Error() + } + + actual := exoscalev1.KafkaParameters{ + Maintenance: exoscalev1.MaintenanceSpec{ + DayOfWeek: external.Maintenance.Dow, + TimeOfDay: exoscalev1.TimeOfDay(external.Maintenance.Time), + }, + Zone: expected.Zone, + DBaaSParameters: exoscalev1.DBaaSParameters{ + TerminationProtection: pointer.BoolDeref(external.TerminationProtection, false), + Size: exoscalev1.SizeSpec{ + Plan: external.Plan, + }, + IPFilter: actualIPFilter, + }, + KafkaSettings: actualKafkaSettings, + } + + return cmp.Equal(expected, actual), cmp.Diff(expected, actual) +} diff --git a/operator/operator.go b/operator/operator.go index 5649597c..167efe8f 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -4,6 +4,7 @@ import ( "github.com/vshn/provider-exoscale/operator/bucketcontroller" "github.com/vshn/provider-exoscale/operator/configcontroller" "github.com/vshn/provider-exoscale/operator/iamkeycontroller" + "github.com/vshn/provider-exoscale/operator/kafkacontroller" "github.com/vshn/provider-exoscale/operator/mysqlcontroller" "github.com/vshn/provider-exoscale/operator/postgresqlcontroller" "github.com/vshn/provider-exoscale/operator/rediscontroller" @@ -20,6 +21,7 @@ func SetupControllers(mgr ctrl.Manager) error { mysqlcontroller.SetupController, postgresqlcontroller.SetupController, rediscontroller.SetupController, + kafkacontroller.SetupController, } { if err := setup(mgr); err != nil { return err From ea77d7c672c179e207d526e75899675313246010 Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Fri, 25 Nov 2022 11:08:10 +0100 Subject: [PATCH 03/21] Drop unused dependecies --- operator/kafkacontroller/connection.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/operator/kafkacontroller/connection.go b/operator/kafkacontroller/connection.go index 2e644644..db378165 100644 --- a/operator/kafkacontroller/connection.go +++ b/operator/kafkacontroller/connection.go @@ -27,10 +27,7 @@ type connector struct { } type connection struct { - kube client.Client - recorder event.Recorder - - exo *exoscalesdk.Client + exo oapi.ClientWithResponsesInterface } // Connect implements managed.ExternalConnecter. @@ -48,8 +45,6 @@ func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.E return nil, err } return connection{ - kube: c.kube, - recorder: c.recorder, exo: exo.Exoscale, }, nil } From d95bbe482cb60ca1121d2357381a6b902a7cebf4 Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Fri, 25 Nov 2022 11:57:50 +0100 Subject: [PATCH 04/21] Add basic unit tests for CRUD operations --- operator/kafkacontroller/connection.go | 17 ++- operator/kafkacontroller/connection_test.go | 140 ++++++++++++++++++++ 2 files changed, 153 insertions(+), 4 deletions(-) create mode 100644 operator/kafkacontroller/connection_test.go diff --git a/operator/kafkacontroller/connection.go b/operator/kafkacontroller/connection.go index db378165..260f9b37 100644 --- a/operator/kafkacontroller/connection.go +++ b/operator/kafkacontroller/connection.go @@ -45,7 +45,7 @@ func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.E return nil, err } return connection{ - exo: exo.Exoscale, + exo: exo.Exoscale, }, nil } @@ -87,7 +87,10 @@ func (c connection) Create(ctx context.Context, mg resource.Managed) (managed.Ex log := controllerruntime.LoggerFrom(ctx) log.V(1).Info("creating resource") - instance := mg.(*exoscalev1.Kafka) + instance, ok := mg.(*exoscalev1.Kafka) + if !ok { + return managed.ExternalCreation{}, fmt.Errorf("invalid managed resource type %T for kafka connection", mg) + } spec := instance.Spec.ForProvider ipFilter := []string(spec.IPFilter) @@ -123,7 +126,10 @@ func (c connection) Delete(ctx context.Context, mg resource.Managed) error { log := controllerruntime.LoggerFrom(ctx) log.V(1).Info("deleting resource") - instance := mg.(*exoscalev1.Kafka) + instance, ok := mg.(*exoscalev1.Kafka) + if !ok { + return fmt.Errorf("invalid managed resource type %T for kafka connection", mg) + } resp, err := c.exo.DeleteDbaasServiceWithResponse(ctx, instance.GetInstanceName()) if err != nil { return fmt.Errorf("cannot delete kafak instance: %w", err) @@ -137,7 +143,10 @@ func (c connection) Update(ctx context.Context, mg resource.Managed) (managed.Ex log := controllerruntime.LoggerFrom(ctx) log.V(1).Info("updating resource") - instance := mg.(*exoscalev1.Kafka) + instance, ok := mg.(*exoscalev1.Kafka) + if !ok { + return managed.ExternalUpdate{}, fmt.Errorf("invalid managed resource type %T for kafka connection", mg) + } spec := instance.Spec.ForProvider ipFilter := []string(spec.IPFilter) diff --git a/operator/kafkacontroller/connection_test.go b/operator/kafkacontroller/connection_test.go new file mode 100644 index 00000000..6c4e0800 --- /dev/null +++ b/operator/kafkacontroller/connection_test.go @@ -0,0 +1,140 @@ +package kafkacontroller + +import ( + "context" + "testing" + + "github.com/exoscale/egoscale/v2/oapi" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/internal/operatortest" +) + +func TestCreate(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := exoscalev1.Kafka{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + } + instance.Spec.ForProvider.Size.Plan = "businesss-8" + instance.Spec.ForProvider.IPFilter = []string{ + "0.0.0.0/0", + } + instance.Spec.ForProvider.Maintenance.DayOfWeek = "monday" + instance.Spec.ForProvider.Maintenance.TimeOfDay = "10:10:10" + ctx := context.Background() + + exoMock.On("CreateDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo"), + mock.MatchedBy(func(req oapi.CreateDbaasServiceKafkaJSONRequestBody) bool { + return req.IpFilter != nil && len(*req.IpFilter) == 1 && (*req.IpFilter)[0] == "0.0.0.0/0" && + req.Plan == "businesss-8" && + req.Maintenance != nil && req.Maintenance.Dow == "monday" && req.Maintenance.Time == "10:10:10" + })). + Return(&oapi.CreateDbaasServiceKafkaResponse{Body: []byte{}}, nil). + Once() + + assert.NotPanics(t, func() { + _, err := c.Create(ctx, &instance) + require.NoError(t, err) + }) +} + +func TestCreate_invalidInput(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + ctx := context.Background() + assert.NotPanics(t, func() { + _, err := c.Create(ctx, nil) + assert.Error(t, err) + }) +} + +func TestUpdate(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := exoscalev1.Kafka{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bar", + }, + } + instance.Spec.ForProvider.Size.Plan = "businesss-4" + instance.Spec.ForProvider.IPFilter = []string{ + "1.0.0.0/8", + "2.0.0.0/8", + } + instance.Spec.ForProvider.Maintenance.DayOfWeek = "monday" + instance.Spec.ForProvider.Maintenance.TimeOfDay = "11:11:11" + ctx := context.Background() + + exoMock.On("UpdateDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("bar"), + mock.MatchedBy(func(req oapi.UpdateDbaasServiceKafkaJSONRequestBody) bool { + return req.IpFilter != nil && len(*req.IpFilter) == 2 && (*req.IpFilter)[0] == "1.0.0.0/8" && + req.Plan != nil && *req.Plan == "businesss-4" && + req.Maintenance != nil && req.Maintenance.Dow == "monday" && req.Maintenance.Time == "11:11:11" + })). + Return(&oapi.UpdateDbaasServiceKafkaResponse{Body: []byte{}}, nil). + Once() + + assert.NotPanics(t, func() { + _, err := c.Update(ctx, &instance) + require.NoError(t, err) + }) +} + +func TestUpdate_invalidInput(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + ctx := context.Background() + assert.NotPanics(t, func() { + _, err := c.Update(ctx, nil) + assert.Error(t, err) + }) +} + +func TestDelete(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := exoscalev1.Kafka{ + ObjectMeta: metav1.ObjectMeta{ + Name: "buzz", + }, + } + ctx := context.Background() + + exoMock.On("DeleteDbaasServiceWithResponse", mock.Anything, "buzz"). + Return(&oapi.DeleteDbaasServiceResponse{Body: []byte{}}, nil). + Once() + + assert.NotPanics(t, func() { + err := c.Delete(ctx, &instance) + require.NoError(t, err) + }) +} +func TestDelete_invalidInput(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + ctx := context.Background() + assert.NotPanics(t, func() { + err := c.Delete(ctx, nil) + assert.Error(t, err) + }) +} From 7358eab75c9434d51d08a017b2668c33cf129572 Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Fri, 25 Nov 2022 13:20:52 +0100 Subject: [PATCH 05/21] Make create and delete idempotent --- operator/kafkacontroller/connection.go | 10 ++++ operator/kafkacontroller/connection_test.go | 51 +++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/operator/kafkacontroller/connection.go b/operator/kafkacontroller/connection.go index 260f9b37..558f4ac7 100644 --- a/operator/kafkacontroller/connection.go +++ b/operator/kafkacontroller/connection.go @@ -2,6 +2,7 @@ package kafkacontroller import ( "context" + "errors" "fmt" "strings" "time" @@ -15,6 +16,7 @@ import ( "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" exoscalesdk "github.com/exoscale/egoscale/v2" + exoscaleapi "github.com/exoscale/egoscale/v2/api" "github.com/exoscale/egoscale/v2/oapi" controllerruntime "sigs.k8s.io/controller-runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -115,6 +117,11 @@ func (c connection) Create(ctx context.Context, mg resource.Managed) (managed.Ex resp, err := c.exo.CreateDbaasServiceKafkaWithResponse(ctx, oapi.DbaasServiceName(instance.GetInstanceName()), body) if err != nil { + if errors.Is(err, exoscaleapi.ErrInvalidRequest) && strings.Contains(err.Error(), "Service name is already taken") { + // According to the ExternalClient Interface, create needs to be idempotent. + // However the exoscale client doesn't return very helpful errors, so we need to make this brittle matching to find if we get an already exits error + return managed.ExternalCreation{}, nil + } return managed.ExternalCreation{}, fmt.Errorf("unable to create instance: %w", err) } log.V(2).Info("response", "body", string(resp.Body)) @@ -132,6 +139,9 @@ func (c connection) Delete(ctx context.Context, mg resource.Managed) error { } resp, err := c.exo.DeleteDbaasServiceWithResponse(ctx, instance.GetInstanceName()) if err != nil { + if errors.Is(err, exoscaleapi.ErrNotFound) { + return nil + } return fmt.Errorf("cannot delete kafak instance: %w", err) } log.V(2).Info("response", "body", string(resp.Body)) diff --git a/operator/kafkacontroller/connection_test.go b/operator/kafkacontroller/connection_test.go index 6c4e0800..2d3aa5aa 100644 --- a/operator/kafkacontroller/connection_test.go +++ b/operator/kafkacontroller/connection_test.go @@ -2,8 +2,10 @@ package kafkacontroller import ( "context" + "fmt" "testing" + exoscaleapi "github.com/exoscale/egoscale/v2/api" "github.com/exoscale/egoscale/v2/oapi" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -48,6 +50,34 @@ func TestCreate(t *testing.T) { }) } +func TestCreate_Idempotent(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := exoscalev1.Kafka{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + } + instance.Spec.ForProvider.Size.Plan = "businesss-8" + instance.Spec.ForProvider.IPFilter = []string{ + "0.0.0.0/0", + } + instance.Spec.ForProvider.Maintenance.DayOfWeek = "monday" + instance.Spec.ForProvider.Maintenance.TimeOfDay = "10:10:10" + ctx := context.Background() + + exoMock.On("CreateDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo"), mock.Anything). + Return(nil, fmt.Errorf("%w: Service name is already taken.", exoscaleapi.ErrInvalidRequest)). + Once() + + assert.NotPanics(t, func() { + _, err := c.Create(ctx, &instance) + require.NoError(t, err) + }) +} + func TestCreate_invalidInput(t *testing.T) { exoMock := &operatortest.ClientWithResponsesInterface{} c := connection{ @@ -138,3 +168,24 @@ func TestDelete_invalidInput(t *testing.T) { assert.Error(t, err) }) } +func TestDelete_Idempotent(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := exoscalev1.Kafka{ + ObjectMeta: metav1.ObjectMeta{ + Name: "buzz", + }, + } + ctx := context.Background() + + exoMock.On("DeleteDbaasServiceWithResponse", mock.Anything, "buzz"). + Return(nil, exoscaleapi.ErrNotFound). + Once() + + assert.NotPanics(t, func() { + err := c.Delete(ctx, &instance) + require.NoError(t, err) + }) +} From 9282d0210f638751c2121fb512b98a5bb89aa03b Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Fri, 25 Nov 2022 16:45:42 +0100 Subject: [PATCH 06/21] Add webhook for kafka --- apis/exoscale/v1/kafka_types.go | 2 +- operator/kafkacontroller/connection.go | 64 ---------------- operator/kafkacontroller/controller.go | 67 ++++++++++++++++ operator/kafkacontroller/webhook.go | 101 +++++++++++++++++++++++++ operator/operator.go | 1 + package/webhook/manifests.yaml | 2 +- 6 files changed, 171 insertions(+), 66 deletions(-) create mode 100644 operator/kafkacontroller/controller.go create mode 100644 operator/kafkacontroller/webhook.go diff --git a/apis/exoscale/v1/kafka_types.go b/apis/exoscale/v1/kafka_types.go index 5f6e08d2..a550e20f 100644 --- a/apis/exoscale/v1/kafka_types.go +++ b/apis/exoscale/v1/kafka_types.go @@ -59,7 +59,7 @@ type KafkaStatus struct { // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" // +kubebuilder:subresource:status // +kubebuilder:resource:scope=Cluster,categories={crossplane,exoscale} -// +kubebuilder:webhook:verbs=create;update,path=/validate-exoscale-crossplane-io-v1-kafka,mutating=false,failurePolicy=fail,groups=exoscale.crossplane.io,resources=kafka,versions=v1,name=kafka.exoscale.crossplane.io,sideEffects=None,admissionReviewVersions=v1 +// +kubebuilder:webhook:verbs=create;update,path=/validate-exoscale-crossplane-io-v1-kafka,mutating=false,failurePolicy=fail,groups=exoscale.crossplane.io,resources=kafkas,versions=v1,name=kafka.exoscale.crossplane.io,sideEffects=None,admissionReviewVersions=v1 // Kafka is the API for creating Kafka. type Kafka struct { diff --git a/operator/kafkacontroller/connection.go b/operator/kafkacontroller/connection.go index 558f4ac7..36615a5f 100644 --- a/operator/kafkacontroller/connection.go +++ b/operator/kafkacontroller/connection.go @@ -5,85 +5,21 @@ import ( "errors" "fmt" "strings" - "time" exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" "github.com/vshn/provider-exoscale/operator/mapper" - "github.com/vshn/provider-exoscale/operator/pipelineutil" - "github.com/crossplane/crossplane-runtime/pkg/event" - "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" - exoscalesdk "github.com/exoscale/egoscale/v2" exoscaleapi "github.com/exoscale/egoscale/v2/api" "github.com/exoscale/egoscale/v2/oapi" controllerruntime "sigs.k8s.io/controller-runtime" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" ) -type connector struct { - kube client.Client - recorder event.Recorder -} - type connection struct { exo oapi.ClientWithResponsesInterface } -// Connect implements managed.ExternalConnecter. -func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.ExternalClient, error) { - log := ctrl.LoggerFrom(ctx) - log.V(1).Info("connecting resource") - - kafkaInstance, ok := mg.(*exoscalev1.Kafka) - if !ok { - return nil, fmt.Errorf("invalid managed resource type %T for kafka connector", mg) - } - - exo, err := pipelineutil.OpenExoscaleClient(ctx, c.kube, kafkaInstance.GetProviderConfigName(), exoscalesdk.ClientOptWithAPIEndpoint(fmt.Sprintf("https://api-%s.exoscale.com", kafkaInstance.Spec.ForProvider.Zone))) - if err != nil { - return nil, err - } - return connection{ - exo: exo.Exoscale, - }, nil -} - -// SetupController adds a controller that reconciles managed resources. -func SetupController(mgr ctrl.Manager) error { - name := strings.ToLower(exoscalev1.KafkaGroupKind) - recorder := event.NewAPIRecorder(mgr.GetEventRecorderFor(name)) - - return SetupControllerWithConnecter(mgr, name, recorder, &connector{ - kube: mgr.GetClient(), - recorder: recorder, - }, 30*time.Second) -} - -func SetupControllerWithConnecter(mgr ctrl.Manager, name string, recorder event.Recorder, c managed.ExternalConnecter, creationGracePeriod time.Duration) error { - r := createReconciler(mgr, name, recorder, c, creationGracePeriod) - - return ctrl.NewControllerManagedBy(mgr). - Named(name). - For(&exoscalev1.Kafka{}). - Complete(r) -} - -func createReconciler(mgr ctrl.Manager, name string, recorder event.Recorder, c managed.ExternalConnecter, creationGracePeriod time.Duration) *managed.Reconciler { - cps := []managed.ConnectionPublisher{managed.NewAPISecretPublisher(mgr.GetClient(), mgr.GetScheme())} - - return managed.NewReconciler(mgr, - resource.ManagedKind(exoscalev1.KafkaGroupVersionKind), - managed.WithExternalConnecter(c), - managed.WithLogger(logging.NewLogrLogger(mgr.GetLogger().WithValues("controller", name))), - managed.WithRecorder(recorder), - managed.WithPollInterval(1*time.Minute), - managed.WithConnectionPublishers(cps...), - managed.WithCreationGracePeriod(creationGracePeriod)) -} - // Create implements managed.ExternalClient func (c connection) Create(ctx context.Context, mg resource.Managed) (managed.ExternalCreation, error) { log := controllerruntime.LoggerFrom(ctx) diff --git a/operator/kafkacontroller/controller.go b/operator/kafkacontroller/controller.go new file mode 100644 index 00000000..58a2dd26 --- /dev/null +++ b/operator/kafkacontroller/controller.go @@ -0,0 +1,67 @@ +package kafkacontroller + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/crossplane/crossplane-runtime/pkg/event" + "github.com/crossplane/crossplane-runtime/pkg/logging" + "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" + "github.com/crossplane/crossplane-runtime/pkg/resource" + exoscalesdk "github.com/exoscale/egoscale/v2" + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/pipelineutil" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type connector struct { + kube client.Client + recorder event.Recorder +} + +// Connect implements managed.ExternalConnecter. +func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.ExternalClient, error) { + log := ctrl.LoggerFrom(ctx) + log.V(1).Info("connecting resource") + + kafkaInstance, ok := mg.(*exoscalev1.Kafka) + if !ok { + return nil, fmt.Errorf("invalid managed resource type %T for kafka connector", mg) + } + + exo, err := pipelineutil.OpenExoscaleClient(ctx, c.kube, kafkaInstance.GetProviderConfigName(), exoscalesdk.ClientOptWithAPIEndpoint(fmt.Sprintf("https://api-%s.exoscale.com", kafkaInstance.Spec.ForProvider.Zone))) + if err != nil { + return nil, err + } + return connection{ + exo: exo.Exoscale, + }, nil +} + +// SetupController adds a controller that reconciles managed resources. +func SetupController(mgr ctrl.Manager) error { + name := strings.ToLower(exoscalev1.KafkaGroupKind) + recorder := event.NewAPIRecorder(mgr.GetEventRecorderFor(name)) + + cps := []managed.ConnectionPublisher{managed.NewAPISecretPublisher(mgr.GetClient(), mgr.GetScheme())} + + r := managed.NewReconciler(mgr, + resource.ManagedKind(exoscalev1.KafkaGroupVersionKind), + managed.WithExternalConnecter(&connector{ + kube: mgr.GetClient(), + recorder: recorder, + }), + managed.WithLogger(logging.NewLogrLogger(mgr.GetLogger().WithValues("controller", name))), + managed.WithRecorder(recorder), + managed.WithPollInterval(1*time.Minute), + managed.WithConnectionPublishers(cps...), + managed.WithCreationGracePeriod(30*time.Second)) + + return ctrl.NewControllerManagedBy(mgr). + Named(name). + For(&exoscalev1.Kafka{}). + Complete(r) +} diff --git a/operator/kafkacontroller/webhook.go b/operator/kafkacontroller/webhook.go new file mode 100644 index 00000000..60504da3 --- /dev/null +++ b/operator/kafkacontroller/webhook.go @@ -0,0 +1,101 @@ +package kafkacontroller + +import ( + "context" + "fmt" + "strings" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/webhook" + "go.uber.org/multierr" + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/runtime" +) + +// SetupWebhook adds a webhook for managed resources. +func SetupWebhook(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(&exoscalev1.Kafka{}). + WithValidator(&Validator{ + log: mgr.GetLogger().WithName("webhook").WithName(strings.ToLower(exoscalev1.KafkaKind)), + }). + Complete() +} + +// Validator validates admission requests. +type Validator struct { + log logr.Logger +} + +// ValidateCreate implements admission.CustomValidator. +func (v *Validator) ValidateCreate(_ context.Context, obj runtime.Object) error { + instance, ok := obj.(*exoscalev1.Kafka) + if !ok { + return fmt.Errorf("invalid managed resource type %T for kafka webhook", obj) + } + v.log.V(2).WithValues("instance", instance).Info("validate create") + + return validateSpec(instance.Spec.ForProvider) +} + +// ValidateUpdate implements admission.CustomValidator. +func (v *Validator) ValidateUpdate(_ context.Context, oldObj, newObj runtime.Object) error { + newInstance, ok := newObj.(*exoscalev1.Kafka) + if !ok { + return fmt.Errorf("invalid managed resource type %T for kafka webhook", newObj) + } + oldInstance, ok := oldObj.(*exoscalev1.Kafka) + if !ok { + return fmt.Errorf("invalid managed resource type %T for kafka webhook", newObj) + } + v.log.V(2).WithValues("old", oldInstance, "new", newInstance).Info("VALIDATE update") + + err := validateSpec(newInstance.Spec.ForProvider) + if err != nil { + return err + } + return validateImmutable(oldInstance.Spec.ForProvider, newInstance.Spec.ForProvider) +} + +// ValidateDelete implements admission.CustomValidator. +func (v *Validator) ValidateDelete(_ context.Context, obj runtime.Object) error { + v.log.V(2).Info("validate delete (noop)") + return nil +} + +func validateSpec(params exoscalev1.KafkaParameters) error { + return multierr.Combine( + validateIpFilter(params), + validateMaintenanceSchedule(params), + validateKafkaSettings(params), + ) +} + +func validateIpFilter(params exoscalev1.KafkaParameters) error { + if len(params.IPFilter) == 0 { + return fmt.Errorf("IP filter cannot be empty") + } + return nil +} + +func validateMaintenanceSchedule(params exoscalev1.KafkaParameters) error { + _, _, _, err := params.Maintenance.TimeOfDay.Parse() + return err +} + +func validateKafkaSettings(obj exoscalev1.KafkaParameters) error { + return webhook.ValidateRawExtension(obj.KafkaSettings) +} + +func validateImmutable(oldParams, newParams exoscalev1.KafkaParameters) error { + return compareZone(oldParams, newParams) +} + +func compareZone(oldParams, newParams exoscalev1.KafkaParameters) error { + if oldParams.Zone != newParams.Zone { + return fmt.Errorf("field is immutable: %s (old), %s (changed)", oldParams.Zone, newParams.Zone) + } + return nil +} diff --git a/operator/operator.go b/operator/operator.go index 167efe8f..7d4c6eea 100644 --- a/operator/operator.go +++ b/operator/operator.go @@ -38,6 +38,7 @@ func SetupWebhooks(mgr ctrl.Manager) error { mysqlcontroller.SetupWebhook, postgresqlcontroller.SetupWebhook, rediscontroller.SetupWebhook, + kafkacontroller.SetupWebhook, } { if err := setup(mgr); err != nil { return err diff --git a/package/webhook/manifests.yaml b/package/webhook/manifests.yaml index 61c6a91a..ee9b5b45 100644 --- a/package/webhook/manifests.yaml +++ b/package/webhook/manifests.yaml @@ -63,7 +63,7 @@ webhooks: - CREATE - UPDATE resources: - - kafka + - kafkas sideEffects: None - admissionReviewVersions: - v1 From 5ce5f843069a5afeebf3d74912f92ccd66d2ee84 Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Mon, 28 Nov 2022 11:24:29 +0100 Subject: [PATCH 07/21] Add tests for observer --- operator/kafkacontroller/observe_test.go | 303 +++++++++++++++++++++++ 1 file changed, 303 insertions(+) create mode 100644 operator/kafkacontroller/observe_test.go diff --git a/operator/kafkacontroller/observe_test.go b/operator/kafkacontroller/observe_test.go new file mode 100644 index 00000000..f2e2511c --- /dev/null +++ b/operator/kafkacontroller/observe_test.go @@ -0,0 +1,303 @@ +package kafkacontroller + +import ( + "context" + "testing" + + xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" + "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" + exoscaleapi "github.com/exoscale/egoscale/v2/api" + "github.com/exoscale/egoscale/v2/oapi" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/internal/operatortest" + "github.com/vshn/provider-exoscale/operator/mapper" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" +) + +func TestObserve_NotExits(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := sampleKafka("foo") + + ctx := context.Background() + + exoMock.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo")). + Return(&oapi.GetDbaasServiceKafkaResponse{Body: []byte{}}, exoscaleapi.ErrNotFound). + Once() + + assert.NotPanics(t, func() { + res, err := c.Observe(ctx, &instance) + assert.NoError(t, err) + require.NotNil(t, res) + assert.False(t, res.ResourceExists, "report resource not exits") + }) +} + +func TestObserve_UpToDate_ConnectionDetails(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := sampleKafka("foo") + found := sampleAPIKafka("foo") + found.Uri = pointer.String("foobar.com:21701") + found.UriParams = &map[string]interface{}{ + "host": "foobar.com", + "port": "21701", + } + found.ConnectionInfo.Nodes = &[]string{ + "10.10.1.1:21701", + "10.10.1.2:21701", + "10.10.1.3:21701", + } + found.ConnectionInfo.AccessCert = pointer.String("CERT") + found.ConnectionInfo.AccessKey = pointer.String("KEY") + + ctx := context.Background() + + exoMock.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo")). + Return(&oapi.GetDbaasServiceKafkaResponse{ + Body: []byte{}, + JSON200: found, + }, nil). + Once() + + assert.NotPanics(t, func() { + res, err := c.Observe(ctx, &instance) + assert.NoError(t, err) + require.NotNil(t, res) + assert.True(t, res.ResourceExists, "report resource exits") + assert.True(t, res.ResourceUpToDate, "report resource uptodate") + require.NotNil(t, res.ConnectionDetails) + expectedConnDetails := managed.ConnectionDetails{ + "KAFKA_URI": []byte("foobar.com:21701"), + "KAFKA_HOST": []byte("foobar.com"), + "KAFKA_PORT": []byte("21701"), + "KAFKA_NODES": []byte("10.10.1.1:21701 10.10.1.2:21701 10.10.1.3:21701"), + "cert.pem": []byte("CERT"), + "key.pem": []byte("KEY"), + } + assert.Equal(t, expectedConnDetails, res.ConnectionDetails) + }) +} + +func TestObserve_UpToDate_Status(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := sampleKafka("foo") + found := sampleAPIKafka("foo") + found.Version = pointer.String("3.2.1") + found.NodeStates = &[]oapi.DbaasNodeState{ + { + Name: "node-1", + State: "running", + }, + { + Name: "node-3", + State: "leaving", + }, + } + + ctx := context.Background() + exoMock.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo")). + Return(&oapi.GetDbaasServiceKafkaResponse{ + Body: []byte{}, + JSON200: found, + }, nil). + Once() + assert.NotPanics(t, func() { + res, err := c.Observe(ctx, &instance) + assert.NoError(t, err) + require.NotNil(t, res) + assert.True(t, res.ResourceExists, "report resource exits") + assert.True(t, res.ResourceUpToDate, "report resource uptodate") + + assert.Equal(t, "3.2.1", instance.Status.AtProvider.Version) + require.Len(t, instance.Status.AtProvider.NodeStates, 2, "expect 2 node states") + assert.Equal(t, "node-1", instance.Status.AtProvider.NodeStates[0].Name) + assert.EqualValues(t, "running", instance.Status.AtProvider.NodeStates[0].State) + assert.EqualValues(t, "leaving", instance.Status.AtProvider.NodeStates[1].State) + }) +} + +func TestObserve_UpToDate_Condition_NotReady(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := sampleKafka("foo") + found := sampleAPIKafka("foo") + state := oapi.EnumServiceStateRebalancing + found.State = &state + + ctx := context.Background() + exoMock.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo")). + Return(&oapi.GetDbaasServiceKafkaResponse{ + Body: []byte{}, + JSON200: found, + }, nil). + Once() + + assert.NotPanics(t, func() { + res, err := c.Observe(ctx, &instance) + assert.NoError(t, err) + require.NotNil(t, res) + assert.True(t, res.ResourceExists, "report resource exits") + assert.True(t, res.ResourceUpToDate, "report resource uptodate") + + readyState := instance.Status.ConditionedStatus.GetCondition(xpv1.TypeReady) + + assert.Equal(t, corev1.ConditionFalse, readyState.Status) + assert.EqualValues(t, "Rebalancing", readyState.Reason) + }) +} + +func TestObserve_UpToDate_Condition_Ready(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := sampleKafka("foo") + found := sampleAPIKafka("foo") + state := oapi.EnumServiceStateRunning + found.State = &state + + ctx := context.Background() + exoMock.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo")). + Return(&oapi.GetDbaasServiceKafkaResponse{ + Body: []byte{}, + JSON200: found, + }, nil). + Once() + + assert.NotPanics(t, func() { + res, err := c.Observe(ctx, &instance) + assert.NoError(t, err) + require.NotNil(t, res) + assert.True(t, res.ResourceExists, "report resource exits") + assert.True(t, res.ResourceUpToDate, "report resource uptodate") + + readyState := instance.Status.ConditionedStatus.GetCondition(xpv1.TypeReady) + + assert.Equal(t, corev1.ConditionTrue, readyState.Status) + }) +} + +func TestObserve_Outdated(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := sampleKafka("foo") + found := sampleAPIKafka("foo") + found.Maintenance.Dow = "tuesday" + + ctx := context.Background() + exoMock.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo")). + Return(&oapi.GetDbaasServiceKafkaResponse{ + Body: []byte{}, + JSON200: found, + }, nil). + Once() + + assert.NotPanics(t, func() { + res, err := c.Observe(ctx, &instance) + assert.NoError(t, err) + require.NotNil(t, res) + assert.True(t, res.ResourceExists, "report resource exits") + assert.False(t, res.ResourceUpToDate, "report resource not uptodate") + }) +} + +func TestObserve_Outdated_Settings(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := sampleKafka("foo") + setting, _ := mapper.ToRawExtension(&map[string]interface{}{ + "count": 1, + "foo": "bar", + }) + instance.Spec.ForProvider.KafkaSettings = setting + found := sampleAPIKafka("foo") + found.KafkaRestSettings = &map[string]interface{}{ + "foo": "bar", + "count": 2, + } + + ctx := context.Background() + exoMock.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo")). + Return(&oapi.GetDbaasServiceKafkaResponse{ + Body: []byte{}, + JSON200: found, + }, nil). + Once() + + assert.NotPanics(t, func() { + res, err := c.Observe(ctx, &instance) + assert.NoError(t, err) + require.NotNil(t, res) + assert.True(t, res.ResourceExists, "report resource exits") + assert.False(t, res.ResourceUpToDate, "report resource not uptodate") + }) +} + +func sampleKafka(name string) exoscalev1.Kafka { + instance := exoscalev1.Kafka{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + } + instance.Spec.ForProvider.Size.Plan = "businesss-8" + instance.Spec.ForProvider.IPFilter = []string{ + "0.0.0.0/0", + } + instance.Spec.ForProvider.Maintenance.DayOfWeek = "monday" + instance.Spec.ForProvider.Maintenance.TimeOfDay = "10:10:10" + return instance +} + +func sampleAPIKafka(name string) *oapi.DbaasServiceKafka { + res := oapi.DbaasServiceKafka{} + + res.Name = oapi.DbaasServiceName(name) + res.Plan = "businesss-8" + res.IpFilter = &[]string{"0.0.0.0/0"} + res.Maintenance = &oapi.DbaasServiceMaintenance{ + Dow: "monday", + Time: "10:10:10", + } + + nodes := []string{"194.182.160.164:21701", + "159.100.244.100:21701", + "159.100.241.65:21701", + } + res.ConnectionInfo = &struct { + AccessCert *string "json:\"access-cert,omitempty\"" + AccessKey *string "json:\"access-key,omitempty\"" + Nodes *[]string "json:\"nodes,omitempty\"" + RegistryUri *string "json:\"registry-uri,omitempty\"" + RestUri *string "json:\"rest-uri,omitempty\"" + }{ + AccessCert: pointer.String("SOME ACCESS CERT"), + AccessKey: pointer.String("SOME ACCESS KEY"), + Nodes: &nodes, + } + + res.Uri = pointer.String("foo-exoscale-8fa13713-1027-4b9c-bca7-4c14f9ff9928.aivencloud.com") + res.UriParams = &map[string]interface{}{} + + res.Version = pointer.String("3.2.1") + + return &res +} From bfb113a14ea9491e2e02c78ef1ecf57867e6125e Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Mon, 28 Nov 2022 11:59:33 +0100 Subject: [PATCH 08/21] Add webhook tests --- operator/kafkacontroller/observe_test.go | 9 +-- operator/kafkacontroller/webhook_test.go | 73 ++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 4 deletions(-) create mode 100644 operator/kafkacontroller/webhook_test.go diff --git a/operator/kafkacontroller/observe_test.go b/operator/kafkacontroller/observe_test.go index f2e2511c..5d18651f 100644 --- a/operator/kafkacontroller/observe_test.go +++ b/operator/kafkacontroller/observe_test.go @@ -225,14 +225,14 @@ func TestObserve_Outdated_Settings(t *testing.T) { } instance := sampleKafka("foo") setting, _ := mapper.ToRawExtension(&map[string]interface{}{ - "count": 1, - "foo": "bar", + "count": 1, + "foo": "bar", }) instance.Spec.ForProvider.KafkaSettings = setting found := sampleAPIKafka("foo") found.KafkaRestSettings = &map[string]interface{}{ - "foo": "bar", - "count": 2, + "foo": "bar", + "count": 2, } ctx := context.Background() @@ -264,6 +264,7 @@ func sampleKafka(name string) exoscalev1.Kafka { } instance.Spec.ForProvider.Maintenance.DayOfWeek = "monday" instance.Spec.ForProvider.Maintenance.TimeOfDay = "10:10:10" + instance.Spec.ForProvider.Zone = "ch-dk-2" return instance } diff --git a/operator/kafkacontroller/webhook_test.go b/operator/kafkacontroller/webhook_test.go new file mode 100644 index 00000000..382154a4 --- /dev/null +++ b/operator/kafkacontroller/webhook_test.go @@ -0,0 +1,73 @@ +package kafkacontroller + +import ( + "context" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" +) + +func TestWebhook_Create(t *testing.T) { + ctx := context.TODO() + v := Validator{ + log: logr.Discard(), + } + + base := sampleKafka("foo") + + t.Run("valid", func(t *testing.T) { + err := v.ValidateCreate(ctx, &base) + assert.NoError(t, err) + }) + t.Run("invalid empty", func(t *testing.T) { + err := v.ValidateCreate(ctx, &exoscalev1.Kafka{}) + assert.Error(t, err) + }) + t.Run("invalid no ipfilter", func(t *testing.T) { + inst := base + inst.Spec.ForProvider.IPFilter = nil + err := v.ValidateCreate(ctx, &inst) + assert.Error(t, err) + }) + t.Run("invalid no time", func(t *testing.T) { + inst := base + inst.Spec.ForProvider.Maintenance.TimeOfDay = "" + err := v.ValidateCreate(ctx, &inst) + assert.Error(t, err) + }) +} + +func TestWebhook_Update(t *testing.T) { + ctx := context.TODO() + v := Validator{ + log: logr.Discard(), + } + + base := sampleKafka("foo") + + t.Run("no change", func(t *testing.T) { + err := v.ValidateUpdate(ctx, &base, &base) + assert.NoError(t, err) + }) + t.Run("valid change", func(t *testing.T) { + inst := base + inst.Spec.ForProvider.IPFilter = []string{"10.10.1.1/24", "10.10.2.1/24"} + err := v.ValidateUpdate(ctx, &base, &inst) + assert.NoError(t, err) + }) + t.Run("remove ipfilter", func(t *testing.T) { + inst := base + inst.Spec.ForProvider.IPFilter = nil + err := v.ValidateUpdate(ctx, &base, &inst) + assert.Error(t, err) + }) + t.Run("change zone", func(t *testing.T) { + inst := base + inst.Spec.ForProvider.Zone = "ch-gva-2" + err := v.ValidateUpdate(ctx, &base, &inst) + assert.Error(t, err) + }) + +} From e044c39270897b21271d1658a53b5ab6d5f22517 Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Mon, 28 Nov 2022 16:25:33 +0100 Subject: [PATCH 09/21] Add basic e2e tests --- test/e2e/kafka/00-assert.yaml | 48 ++++++++++++++++++++++++++++++++++ test/e2e/kafka/00-install.yaml | 19 ++++++++++++++ test/e2e/kafka/02-delete.yaml | 7 +++++ 3 files changed, 74 insertions(+) create mode 100644 test/e2e/kafka/00-assert.yaml create mode 100644 test/e2e/kafka/00-install.yaml create mode 100644 test/e2e/kafka/02-delete.yaml diff --git a/test/e2e/kafka/00-assert.yaml b/test/e2e/kafka/00-assert.yaml new file mode 100644 index 00000000..e8577df6 --- /dev/null +++ b/test/e2e/kafka/00-assert.yaml @@ -0,0 +1,48 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +--- +apiVersion: exoscale.crossplane.io/v1 +kind: Kafka +metadata: + annotations: + crossplane.io/external-name: e2e-test-kafka + finalizers: + - finalizer.managedresource.crossplane.io + name: e2e-test-kafka +spec: + deletionPolicy: Delete + forProvider: + ipFilter: + - 0.0.0.0/0 + maintenance: + dayOfWeek: monday + timeOfDay: "12:00:00" + size: + plan: startup-2 + zone: ch-dk-2 + providerConfigRef: + name: provider-config + writeConnectionSecretToRef: + name: e2e-test-kafka-details + namespace: default +status: + atProvider: + nodeStates: + - state: running + - state: running + - state: running + conditions: + - status: "True" + - status: "True" +--- +apiVersion: v1 +kind: Secret +type: connection.crossplane.io/v1alpha1 +metadata: + name: e2e-test-kafka-details + namespace: default + ownerReferences: + - apiVersion: exoscale.crossplane.io/v1 + kind: Kafka + name: e2e-test-kafka diff --git a/test/e2e/kafka/00-install.yaml b/test/e2e/kafka/00-install.yaml new file mode 100644 index 00000000..fa751995 --- /dev/null +++ b/test/e2e/kafka/00-install.yaml @@ -0,0 +1,19 @@ +apiVersion: exoscale.crossplane.io/v1 +kind: Kafka +metadata: + name: e2e-test-kafka +spec: + forProvider: + ipFilter: + - 0.0.0.0/0 + maintenance: + dayOfWeek: monday + timeOfDay: "12:00:00" + size: + plan: startup-2 + zone: ch-dk-2 + providerConfigRef: + name: provider-config + writeConnectionSecretToRef: + name: e2e-test-kafka-details + namespace: default diff --git a/test/e2e/kafka/02-delete.yaml b/test/e2e/kafka/02-delete.yaml new file mode 100644 index 00000000..55d55fe7 --- /dev/null +++ b/test/e2e/kafka/02-delete.yaml @@ -0,0 +1,7 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +delete: + # This will wait until resources are really gone + - apiVersion: exoscale.crossplane.io/v1 + kind: kafka + name: e2e-test-kafka From 79c54989a6cdc9b448c5a296b0ca48c65fac992e Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Mon, 28 Nov 2022 16:42:51 +0100 Subject: [PATCH 10/21] Append certificat authoritiy to connection secret --- operator/kafkacontroller/observe.go | 14 +++++- operator/kafkacontroller/observe_test.go | 56 ++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 2 deletions(-) diff --git a/operator/kafkacontroller/observe.go b/operator/kafkacontroller/observe.go index 7a888b3a..1c348f4e 100644 --- a/operator/kafkacontroller/observe.go +++ b/operator/kafkacontroller/observe.go @@ -49,7 +49,16 @@ func (c connection) Observe(ctx context.Context, mg resource.Managed) (managed.E } instance.SetConditions(condition) - connDetails, err := getConnectionDetails(external) + caRes, err := c.exo.GetDbaasCaCertificateWithResponse(ctx) + if err != nil { + return managed.ExternalObservation{}, fmt.Errorf("cannot retrieve CA certificate: %w", err) + } + ca := "" + if caRes.JSON200 != nil && caRes.JSON200.Certificate != nil { + ca = *caRes.JSON200.Certificate + } + + connDetails, err := getConnectionDetails(external, ca) if err != nil { return managed.ExternalObservation{}, fmt.Errorf("failed to get kafka connection details: %w", err) } @@ -105,7 +114,7 @@ func getCondition(external *oapi.DbaasServiceKafka) (xpv1.Condition, error) { return xpv1.Condition{}, fmt.Errorf("unknown state %q", state) } } -func getConnectionDetails(external *oapi.DbaasServiceKafka) (map[string][]byte, error) { +func getConnectionDetails(external *oapi.DbaasServiceKafka, ca string) (map[string][]byte, error) { if external.ConnectionInfo == nil { return nil, errors.New("no connection details") } @@ -143,6 +152,7 @@ func getConnectionDetails(external *oapi.DbaasServiceKafka) (map[string][]byte, "KAFKA_NODES": []byte(nodes), "cert.pem": []byte(cert), "key.pem": []byte(key), + "ca.crt": []byte(ca), }, nil } diff --git a/operator/kafkacontroller/observe_test.go b/operator/kafkacontroller/observe_test.go index 5d18651f..8bd73325 100644 --- a/operator/kafkacontroller/observe_test.go +++ b/operator/kafkacontroller/observe_test.go @@ -68,6 +68,15 @@ func TestObserve_UpToDate_ConnectionDetails(t *testing.T) { JSON200: found, }, nil). Once() + exoMock.On("GetDbaasCaCertificateWithResponse", mock.Anything). + Return(&oapi.GetDbaasCaCertificateResponse{ + JSON200: &struct { + Certificate *string "json:\"certificate,omitempty\"" + }{ + Certificate: pointer.String("CA"), + }, + }, nil). + Once() assert.NotPanics(t, func() { res, err := c.Observe(ctx, &instance) @@ -83,6 +92,7 @@ func TestObserve_UpToDate_ConnectionDetails(t *testing.T) { "KAFKA_NODES": []byte("10.10.1.1:21701 10.10.1.2:21701 10.10.1.3:21701"), "cert.pem": []byte("CERT"), "key.pem": []byte("KEY"), + "ca.crt": []byte("CA"), } assert.Equal(t, expectedConnDetails, res.ConnectionDetails) }) @@ -114,6 +124,16 @@ func TestObserve_UpToDate_Status(t *testing.T) { JSON200: found, }, nil). Once() + exoMock.On("GetDbaasCaCertificateWithResponse", mock.Anything). + Return(&oapi.GetDbaasCaCertificateResponse{ + JSON200: &struct { + Certificate *string "json:\"certificate,omitempty\"" + }{ + Certificate: pointer.String("CA"), + }, + }, nil). + Once() + assert.NotPanics(t, func() { res, err := c.Observe(ctx, &instance) assert.NoError(t, err) @@ -146,6 +166,15 @@ func TestObserve_UpToDate_Condition_NotReady(t *testing.T) { JSON200: found, }, nil). Once() + exoMock.On("GetDbaasCaCertificateWithResponse", mock.Anything). + Return(&oapi.GetDbaasCaCertificateResponse{ + JSON200: &struct { + Certificate *string "json:\"certificate,omitempty\"" + }{ + Certificate: pointer.String("CA"), + }, + }, nil). + Once() assert.NotPanics(t, func() { res, err := c.Observe(ctx, &instance) @@ -178,6 +207,15 @@ func TestObserve_UpToDate_Condition_Ready(t *testing.T) { JSON200: found, }, nil). Once() + exoMock.On("GetDbaasCaCertificateWithResponse", mock.Anything). + Return(&oapi.GetDbaasCaCertificateResponse{ + JSON200: &struct { + Certificate *string "json:\"certificate,omitempty\"" + }{ + Certificate: pointer.String("CA"), + }, + }, nil). + Once() assert.NotPanics(t, func() { res, err := c.Observe(ctx, &instance) @@ -208,6 +246,15 @@ func TestObserve_Outdated(t *testing.T) { JSON200: found, }, nil). Once() + exoMock.On("GetDbaasCaCertificateWithResponse", mock.Anything). + Return(&oapi.GetDbaasCaCertificateResponse{ + JSON200: &struct { + Certificate *string "json:\"certificate,omitempty\"" + }{ + Certificate: pointer.String("CA"), + }, + }, nil). + Once() assert.NotPanics(t, func() { res, err := c.Observe(ctx, &instance) @@ -242,6 +289,15 @@ func TestObserve_Outdated_Settings(t *testing.T) { JSON200: found, }, nil). Once() + exoMock.On("GetDbaasCaCertificateWithResponse", mock.Anything). + Return(&oapi.GetDbaasCaCertificateResponse{ + JSON200: &struct { + Certificate *string "json:\"certificate,omitempty\"" + }{ + Certificate: pointer.String("CA"), + }, + }, nil). + Once() assert.NotPanics(t, func() { res, err := c.Observe(ctx, &instance) From 61519f18f654b0f3a142f292719d4b228ca3a529 Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Mon, 28 Nov 2022 17:52:03 +0100 Subject: [PATCH 11/21] Add connect e2e tests --- test/e2e/kafka/01-assert.yaml | 11 +++++++ test/e2e/kafka/01-connect.yaml | 52 ++++++++++++++++++++++++++++++++++ test/e2e/kafka/02-delete.yaml | 7 +++++ 3 files changed, 70 insertions(+) create mode 100644 test/e2e/kafka/01-assert.yaml create mode 100644 test/e2e/kafka/01-connect.yaml diff --git a/test/e2e/kafka/01-assert.yaml b/test/e2e/kafka/01-assert.yaml new file mode 100644 index 00000000..f1c4f5f2 --- /dev/null +++ b/test/e2e/kafka/01-assert.yaml @@ -0,0 +1,11 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: connect-kafka + namespace: default +status: + conditions: + - type: Complete + status: 'True' + succeeded: 1 + ready: 0 diff --git a/test/e2e/kafka/01-connect.yaml b/test/e2e/kafka/01-connect.yaml new file mode 100644 index 00000000..93185d23 --- /dev/null +++ b/test/e2e/kafka/01-connect.yaml @@ -0,0 +1,52 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: e2e-test-kafka-config + namespace: default +data: + kaf.config: | + clusters: + - name: test + TLS: + cafile: /.kafka/certs/ca.crt + clientfile: /.kafka/certs/cert.pem + clientkeyfile: /.kafka/certs/key.pem + insecure: false +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: connect-kafka + namespace: default +spec: + backoffLimit: 5 + template: + metadata: + labels: + e2e-test: kafka + spec: + restartPolicy: Never + containers: + - name: connect + image: golang:1.19 + imagePullPolicy: IfNotPresent + command: + - bash + args: + - -c + - echo "Testing create topic...\n" && go install github.com/birdayz/kaf/cmd/kaf@v0.2.3 && kaf -b $KAFKA_URI --config /.kafka/config/kaf.config -c test topic create test + envFrom: + - secretRef: + name: e2e-test-kafka-details + volumeMounts: + - name: certs + mountPath: /.kafka/certs + - name: config + mountPath: /.kafka/config + volumes: + - name: certs + secret: + secretName: e2e-test-kafka-details + - name: config + configMap: + name: e2e-test-kafka-config diff --git a/test/e2e/kafka/02-delete.yaml b/test/e2e/kafka/02-delete.yaml index 55d55fe7..35ca6153 100644 --- a/test/e2e/kafka/02-delete.yaml +++ b/test/e2e/kafka/02-delete.yaml @@ -2,6 +2,13 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep delete: # This will wait until resources are really gone + - apiVersion: batch/v1 + kind: Job + name: connect-kafka + - apiVersion: v1 + kind: Pod + labels: + e2e-test: kafka - apiVersion: exoscale.crossplane.io/v1 kind: kafka name: e2e-test-kafka From 53275efa3102a19d52a012c24f647200b308f256 Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Tue, 29 Nov 2022 09:31:06 +0100 Subject: [PATCH 12/21] Add sample for kafka --- .../ROOT/examples/exoscale_iamkey.yaml | 8 ++--- generate_sample.go | 35 +++++++++++++++++++ samples/exoscale.crossplane.io_kafka.yaml | 25 +++++++++++++ 3 files changed, 64 insertions(+), 4 deletions(-) create mode 100644 samples/exoscale.crossplane.io_kafka.yaml diff --git a/docs/modules/ROOT/examples/exoscale_iamkey.yaml b/docs/modules/ROOT/examples/exoscale_iamkey.yaml index a89a3477..09172ca8 100644 --- a/docs/modules/ROOT/examples/exoscale_iamkey.yaml +++ b/docs/modules/ROOT/examples/exoscale_iamkey.yaml @@ -1,17 +1,17 @@ apiVersion: exoscale.crossplane.io/v1 kind: IAMKey metadata: - name: my-exoscale-iam-key + name: iam-key-local-dev spec: forProvider: - keyName: iam-key + keyName: iam-key-local-dev services: sos: buckets: - - bucket-test-1 + - bucket-local-dev zone: CH-DK-2 providerConfigRef: name: provider-config writeConnectionSecretToRef: - name: my-exoscale-iam-key + name: my-exoscale-user-credentials namespace: default diff --git a/generate_sample.go b/generate_sample.go index 44032766..c7a01ead 100644 --- a/generate_sample.go +++ b/generate_sample.go @@ -41,6 +41,7 @@ func main() { generateMysqlSample() generatePostgresqlSample() generateRedisSample() + generateKafkaSample() } func generatePostgresqlSample() { @@ -268,6 +269,40 @@ func newRedisSample() *exoscalev1.Redis { }, } } +func generateKafkaSample() { + spec := newKafkaSample() + serialize(spec, true) +} + +func newKafkaSample() *exoscalev1.Kafka { + return &exoscalev1.Kafka{ + TypeMeta: metav1.TypeMeta{ + APIVersion: exoscalev1.KafkaGroupVersionKind.GroupVersion().String(), + Kind: exoscalev1.KafkaKind, + }, + ObjectMeta: metav1.ObjectMeta{Name: "kafka-local-dev"}, + Spec: exoscalev1.KafkaSpec{ + ResourceSpec: xpv1.ResourceSpec{ + ProviderConfigReference: &xpv1.Reference{Name: "provider-config"}, + WriteConnectionSecretToReference: &xpv1.SecretReference{Name: "kafka-local-dev-details", Namespace: "default"}, + }, + ForProvider: exoscalev1.KafkaParameters{ + Maintenance: exoscalev1.MaintenanceSpec{ + TimeOfDay: "12:00:00", + DayOfWeek: exoscaleoapi.DbaasServiceMaintenanceDowMonday, + }, + Zone: "ch-dk-2", + DBaaSParameters: exoscalev1.DBaaSParameters{ + Size: exoscalev1.SizeSpec{ + Plan: "startup-2", + }, + IPFilter: exoscalev1.IPFilter{"0.0.0.0/0"}, + }, + KafkaSettings: runtime.RawExtension{Raw: []byte(`{"connections_max_idle_ms": 60000}`)}, + }, + }, + } +} func failIfError(err error) { if err != nil { diff --git a/samples/exoscale.crossplane.io_kafka.yaml b/samples/exoscale.crossplane.io_kafka.yaml new file mode 100644 index 00000000..bbb1289d --- /dev/null +++ b/samples/exoscale.crossplane.io_kafka.yaml @@ -0,0 +1,25 @@ +apiVersion: exoscale.crossplane.io/v1 +kind: Kafka +metadata: + creationTimestamp: null + name: kafka-local-dev +spec: + forProvider: + ipFilter: + - 0.0.0.0/0 + kafkaSettings: + connections_max_idle_ms: 60000 + maintenance: + dayOfWeek: monday + timeOfDay: "12:00:00" + size: + plan: startup-2 + zone: ch-dk-2 + providerConfigRef: + name: provider-config + writeConnectionSecretToRef: + name: kafka-local-dev-details + namespace: default +status: + atProvider: + kafkaSettings: null From c9529a2a5fb09ea246a72d7468ef0dd24faac82f Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Tue, 29 Nov 2022 10:14:45 +0100 Subject: [PATCH 13/21] Cleanup code, fix comments, fix typos --- operator/kafkacontroller/connection.go | 8 +++++--- operator/kafkacontroller/controller.go | 4 ++-- operator/kafkacontroller/observe.go | 22 ++++++++++++---------- operator/kafkacontroller/observe_test.go | 14 +++++++------- operator/kafkacontroller/webhook.go | 10 +++++----- test/e2e/kafka/01-connect.yaml | 4 ++-- 6 files changed, 33 insertions(+), 29 deletions(-) diff --git a/operator/kafkacontroller/connection.go b/operator/kafkacontroller/connection.go index 36615a5f..acf4f6fd 100644 --- a/operator/kafkacontroller/connection.go +++ b/operator/kafkacontroller/connection.go @@ -20,7 +20,8 @@ type connection struct { exo oapi.ClientWithResponsesInterface } -// Create implements managed.ExternalClient +// Create idempotently creates a Kafka instance. +// It will not return an "already exits" error. func (c connection) Create(ctx context.Context, mg resource.Managed) (managed.ExternalCreation, error) { log := controllerruntime.LoggerFrom(ctx) log.V(1).Info("creating resource") @@ -64,7 +65,8 @@ func (c connection) Create(ctx context.Context, mg resource.Managed) (managed.Ex return managed.ExternalCreation{}, nil } -// Delete implements managed.ExternalClient +// Delete idempotently deletes a kafka instance. +// It will not return a "not found" error. func (c connection) Delete(ctx context.Context, mg resource.Managed) error { log := controllerruntime.LoggerFrom(ctx) log.V(1).Info("deleting resource") @@ -84,7 +86,7 @@ func (c connection) Delete(ctx context.Context, mg resource.Managed) error { return nil } -// Update implements managed.ExternalClient +// Update the provided kafka instance. func (c connection) Update(ctx context.Context, mg resource.Managed) (managed.ExternalUpdate, error) { log := controllerruntime.LoggerFrom(ctx) log.V(1).Info("updating resource") diff --git a/operator/kafkacontroller/controller.go b/operator/kafkacontroller/controller.go index 58a2dd26..8fced5ee 100644 --- a/operator/kafkacontroller/controller.go +++ b/operator/kafkacontroller/controller.go @@ -22,7 +22,7 @@ type connector struct { recorder event.Recorder } -// Connect implements managed.ExternalConnecter. +// Connect to the exoscale kafka provider. func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.ExternalClient, error) { log := ctrl.LoggerFrom(ctx) log.V(1).Info("connecting resource") @@ -41,7 +41,7 @@ func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.E }, nil } -// SetupController adds a controller that reconciles managed resources. +// SetupController adds a controller that reconciles kafka resources. func SetupController(mgr ctrl.Manager) error { name := strings.ToLower(exoscalev1.KafkaGroupKind) recorder := event.NewAPIRecorder(mgr.GetEventRecorderFor(name)) diff --git a/operator/kafkacontroller/observe.go b/operator/kafkacontroller/observe.go index 1c348f4e..0897140a 100644 --- a/operator/kafkacontroller/observe.go +++ b/operator/kafkacontroller/observe.go @@ -19,7 +19,9 @@ import ( "github.com/vshn/provider-exoscale/operator/mapper" ) -// Observe implements managed.ExternalClient +// Observe the external kafka instance. +// Will return wether the the instance exits and if it is up-to-date. +// Observe will also update the status to the observed state and return connection details to connect to the instance. func (c connection) Observe(ctx context.Context, mg resource.Managed) (managed.ExternalObservation, error) { log := controllerruntime.LoggerFrom(ctx) log.V(1).Info("observing resource") @@ -63,7 +65,7 @@ func (c connection) Observe(ctx context.Context, mg resource.Managed) (managed.E return managed.ExternalObservation{}, fmt.Errorf("failed to get kafka connection details: %w", err) } - upToDate, diff := diffParamters(external, instance.Spec.ForProvider) + upToDate, diff := diffParameters(external, instance.Spec.ForProvider) return managed.ExternalObservation{ ResourceExists: true, @@ -146,17 +148,17 @@ func getConnectionDetails(external *oapi.DbaasServiceKafka, ca string) (map[stri } return map[string][]byte{ - "KAFKA_URI": []byte(uri), - "KAFKA_HOST": []byte(host), - "KAFKA_PORT": []byte(port), - "KAFKA_NODES": []byte(nodes), - "cert.pem": []byte(cert), - "key.pem": []byte(key), - "ca.crt": []byte(ca), + "KAFKA_URI": []byte(uri), + "KAFKA_HOST": []byte(host), + "KAFKA_PORT": []byte(port), + "KAFKA_NODES": []byte(nodes), + "service.cert": []byte(cert), + "service.key": []byte(key), + "ca.crt": []byte(ca), }, nil } -func diffParamters(external *oapi.DbaasServiceKafka, expected exoscalev1.KafkaParameters) (bool, string) { +func diffParameters(external *oapi.DbaasServiceKafka, expected exoscalev1.KafkaParameters) (bool, string) { actualIPFilter := []string{} if external.IpFilter != nil { actualIPFilter = *external.IpFilter diff --git a/operator/kafkacontroller/observe_test.go b/operator/kafkacontroller/observe_test.go index 8bd73325..8ab12e8a 100644 --- a/operator/kafkacontroller/observe_test.go +++ b/operator/kafkacontroller/observe_test.go @@ -86,13 +86,13 @@ func TestObserve_UpToDate_ConnectionDetails(t *testing.T) { assert.True(t, res.ResourceUpToDate, "report resource uptodate") require.NotNil(t, res.ConnectionDetails) expectedConnDetails := managed.ConnectionDetails{ - "KAFKA_URI": []byte("foobar.com:21701"), - "KAFKA_HOST": []byte("foobar.com"), - "KAFKA_PORT": []byte("21701"), - "KAFKA_NODES": []byte("10.10.1.1:21701 10.10.1.2:21701 10.10.1.3:21701"), - "cert.pem": []byte("CERT"), - "key.pem": []byte("KEY"), - "ca.crt": []byte("CA"), + "KAFKA_URI": []byte("foobar.com:21701"), + "KAFKA_HOST": []byte("foobar.com"), + "KAFKA_PORT": []byte("21701"), + "KAFKA_NODES": []byte("10.10.1.1:21701 10.10.1.2:21701 10.10.1.3:21701"), + "service.cert": []byte("CERT"), + "service.key": []byte("KEY"), + "ca.crt": []byte("CA"), } assert.Equal(t, expectedConnDetails, res.ConnectionDetails) }) diff --git a/operator/kafkacontroller/webhook.go b/operator/kafkacontroller/webhook.go index 60504da3..d05d4cf5 100644 --- a/operator/kafkacontroller/webhook.go +++ b/operator/kafkacontroller/webhook.go @@ -14,7 +14,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) -// SetupWebhook adds a webhook for managed resources. +// SetupWebhook adds a webhook for kafka resources. func SetupWebhook(mgr ctrl.Manager) error { return ctrl.NewWebhookManagedBy(mgr). For(&exoscalev1.Kafka{}). @@ -24,12 +24,12 @@ func SetupWebhook(mgr ctrl.Manager) error { Complete() } -// Validator validates admission requests. +// Validator validates kafka admission requests. type Validator struct { log logr.Logger } -// ValidateCreate implements admission.CustomValidator. +// ValidateCreate validates the spec of a created kafka resource. func (v *Validator) ValidateCreate(_ context.Context, obj runtime.Object) error { instance, ok := obj.(*exoscalev1.Kafka) if !ok { @@ -40,7 +40,7 @@ func (v *Validator) ValidateCreate(_ context.Context, obj runtime.Object) error return validateSpec(instance.Spec.ForProvider) } -// ValidateUpdate implements admission.CustomValidator. +// ValidateUpdate validates the spec of an updated kafka resource and checks that no immutable field has been modified. func (v *Validator) ValidateUpdate(_ context.Context, oldObj, newObj runtime.Object) error { newInstance, ok := newObj.(*exoscalev1.Kafka) if !ok { @@ -59,7 +59,7 @@ func (v *Validator) ValidateUpdate(_ context.Context, oldObj, newObj runtime.Obj return validateImmutable(oldInstance.Spec.ForProvider, newInstance.Spec.ForProvider) } -// ValidateDelete implements admission.CustomValidator. +// ValidateDelete validates a delete. Currently does not validate anything. func (v *Validator) ValidateDelete(_ context.Context, obj runtime.Object) error { v.log.V(2).Info("validate delete (noop)") return nil diff --git a/test/e2e/kafka/01-connect.yaml b/test/e2e/kafka/01-connect.yaml index 93185d23..53c3d83c 100644 --- a/test/e2e/kafka/01-connect.yaml +++ b/test/e2e/kafka/01-connect.yaml @@ -9,8 +9,8 @@ data: - name: test TLS: cafile: /.kafka/certs/ca.crt - clientfile: /.kafka/certs/cert.pem - clientkeyfile: /.kafka/certs/key.pem + clientfile: /.kafka/certs/service.cert + clientkeyfile: /.kafka/certs/service.key insecure: false --- apiVersion: batch/v1 From 38f36ddb6f7f4aa7b7572d52caeedac274cafbf4 Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Tue, 29 Nov 2022 18:05:15 +0100 Subject: [PATCH 14/21] Switch to one file per method for kafka controller --- operator/kafkacontroller/controller.go | 5 + .../{connection.go => create.go} | 64 ----------- .../{connection_test.go => create_test.go} | 100 ------------------ operator/kafkacontroller/delete.go | 34 ++++++ operator/kafkacontroller/delete_test.go | 71 +++++++++++++ operator/kafkacontroller/update.go | 53 ++++++++++ operator/kafkacontroller/update_test.go | 62 +++++++++++ 7 files changed, 225 insertions(+), 164 deletions(-) rename operator/kafkacontroller/{connection.go => create.go} (51%) rename operator/kafkacontroller/{connection_test.go => create_test.go} (50%) create mode 100644 operator/kafkacontroller/delete.go create mode 100644 operator/kafkacontroller/delete_test.go create mode 100644 operator/kafkacontroller/update.go create mode 100644 operator/kafkacontroller/update_test.go diff --git a/operator/kafkacontroller/controller.go b/operator/kafkacontroller/controller.go index 8fced5ee..917cef51 100644 --- a/operator/kafkacontroller/controller.go +++ b/operator/kafkacontroller/controller.go @@ -11,6 +11,7 @@ import ( "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" exoscalesdk "github.com/exoscale/egoscale/v2" + "github.com/exoscale/egoscale/v2/oapi" exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" "github.com/vshn/provider-exoscale/operator/pipelineutil" ctrl "sigs.k8s.io/controller-runtime" @@ -22,6 +23,10 @@ type connector struct { recorder event.Recorder } +type connection struct { + exo oapi.ClientWithResponsesInterface +} + // Connect to the exoscale kafka provider. func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.ExternalClient, error) { log := ctrl.LoggerFrom(ctx) diff --git a/operator/kafkacontroller/connection.go b/operator/kafkacontroller/create.go similarity index 51% rename from operator/kafkacontroller/connection.go rename to operator/kafkacontroller/create.go index acf4f6fd..b8358387 100644 --- a/operator/kafkacontroller/connection.go +++ b/operator/kafkacontroller/create.go @@ -16,10 +16,6 @@ import ( controllerruntime "sigs.k8s.io/controller-runtime" ) -type connection struct { - exo oapi.ClientWithResponsesInterface -} - // Create idempotently creates a Kafka instance. // It will not return an "already exits" error. func (c connection) Create(ctx context.Context, mg resource.Managed) (managed.ExternalCreation, error) { @@ -64,63 +60,3 @@ func (c connection) Create(ctx context.Context, mg resource.Managed) (managed.Ex log.V(2).Info("response", "body", string(resp.Body)) return managed.ExternalCreation{}, nil } - -// Delete idempotently deletes a kafka instance. -// It will not return a "not found" error. -func (c connection) Delete(ctx context.Context, mg resource.Managed) error { - log := controllerruntime.LoggerFrom(ctx) - log.V(1).Info("deleting resource") - - instance, ok := mg.(*exoscalev1.Kafka) - if !ok { - return fmt.Errorf("invalid managed resource type %T for kafka connection", mg) - } - resp, err := c.exo.DeleteDbaasServiceWithResponse(ctx, instance.GetInstanceName()) - if err != nil { - if errors.Is(err, exoscaleapi.ErrNotFound) { - return nil - } - return fmt.Errorf("cannot delete kafak instance: %w", err) - } - log.V(2).Info("response", "body", string(resp.Body)) - return nil -} - -// Update the provided kafka instance. -func (c connection) Update(ctx context.Context, mg resource.Managed) (managed.ExternalUpdate, error) { - log := controllerruntime.LoggerFrom(ctx) - log.V(1).Info("updating resource") - - instance, ok := mg.(*exoscalev1.Kafka) - if !ok { - return managed.ExternalUpdate{}, fmt.Errorf("invalid managed resource type %T for kafka connection", mg) - } - - spec := instance.Spec.ForProvider - ipFilter := []string(spec.IPFilter) - settings, err := mapper.ToMap(spec.KafkaSettings) - if err != nil { - return managed.ExternalUpdate{}, fmt.Errorf("invalid kafka settings: %w", err) - } - - body := oapi.UpdateDbaasServiceKafkaJSONRequestBody{ - IpFilter: &ipFilter, - KafkaSettings: &settings, - Maintenance: &struct { - Dow oapi.UpdateDbaasServiceKafkaJSONBodyMaintenanceDow "json:\"dow\"" - Time string "json:\"time\"" - }{ - Dow: oapi.UpdateDbaasServiceKafkaJSONBodyMaintenanceDow(spec.Maintenance.DayOfWeek), - Time: spec.Maintenance.TimeOfDay.String(), - }, - Plan: &spec.Size.Plan, - TerminationProtection: &spec.TerminationProtection, - } - - resp, err := c.exo.UpdateDbaasServiceKafkaWithResponse(ctx, oapi.DbaasServiceName(instance.GetInstanceName()), body) - if err != nil { - return managed.ExternalUpdate{}, fmt.Errorf("unable to update instance: %w", err) - } - log.V(2).Info("response", "body", string(resp.Body)) - return managed.ExternalUpdate{}, nil -} diff --git a/operator/kafkacontroller/connection_test.go b/operator/kafkacontroller/create_test.go similarity index 50% rename from operator/kafkacontroller/connection_test.go rename to operator/kafkacontroller/create_test.go index 2d3aa5aa..da8eef39 100644 --- a/operator/kafkacontroller/connection_test.go +++ b/operator/kafkacontroller/create_test.go @@ -89,103 +89,3 @@ func TestCreate_invalidInput(t *testing.T) { assert.Error(t, err) }) } - -func TestUpdate(t *testing.T) { - exoMock := &operatortest.ClientWithResponsesInterface{} - c := connection{ - exo: exoMock, - } - instance := exoscalev1.Kafka{ - ObjectMeta: metav1.ObjectMeta{ - Name: "bar", - }, - } - instance.Spec.ForProvider.Size.Plan = "businesss-4" - instance.Spec.ForProvider.IPFilter = []string{ - "1.0.0.0/8", - "2.0.0.0/8", - } - instance.Spec.ForProvider.Maintenance.DayOfWeek = "monday" - instance.Spec.ForProvider.Maintenance.TimeOfDay = "11:11:11" - ctx := context.Background() - - exoMock.On("UpdateDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("bar"), - mock.MatchedBy(func(req oapi.UpdateDbaasServiceKafkaJSONRequestBody) bool { - return req.IpFilter != nil && len(*req.IpFilter) == 2 && (*req.IpFilter)[0] == "1.0.0.0/8" && - req.Plan != nil && *req.Plan == "businesss-4" && - req.Maintenance != nil && req.Maintenance.Dow == "monday" && req.Maintenance.Time == "11:11:11" - })). - Return(&oapi.UpdateDbaasServiceKafkaResponse{Body: []byte{}}, nil). - Once() - - assert.NotPanics(t, func() { - _, err := c.Update(ctx, &instance) - require.NoError(t, err) - }) -} - -func TestUpdate_invalidInput(t *testing.T) { - exoMock := &operatortest.ClientWithResponsesInterface{} - c := connection{ - exo: exoMock, - } - ctx := context.Background() - assert.NotPanics(t, func() { - _, err := c.Update(ctx, nil) - assert.Error(t, err) - }) -} - -func TestDelete(t *testing.T) { - exoMock := &operatortest.ClientWithResponsesInterface{} - c := connection{ - exo: exoMock, - } - instance := exoscalev1.Kafka{ - ObjectMeta: metav1.ObjectMeta{ - Name: "buzz", - }, - } - ctx := context.Background() - - exoMock.On("DeleteDbaasServiceWithResponse", mock.Anything, "buzz"). - Return(&oapi.DeleteDbaasServiceResponse{Body: []byte{}}, nil). - Once() - - assert.NotPanics(t, func() { - err := c.Delete(ctx, &instance) - require.NoError(t, err) - }) -} -func TestDelete_invalidInput(t *testing.T) { - exoMock := &operatortest.ClientWithResponsesInterface{} - c := connection{ - exo: exoMock, - } - ctx := context.Background() - assert.NotPanics(t, func() { - err := c.Delete(ctx, nil) - assert.Error(t, err) - }) -} -func TestDelete_Idempotent(t *testing.T) { - exoMock := &operatortest.ClientWithResponsesInterface{} - c := connection{ - exo: exoMock, - } - instance := exoscalev1.Kafka{ - ObjectMeta: metav1.ObjectMeta{ - Name: "buzz", - }, - } - ctx := context.Background() - - exoMock.On("DeleteDbaasServiceWithResponse", mock.Anything, "buzz"). - Return(nil, exoscaleapi.ErrNotFound). - Once() - - assert.NotPanics(t, func() { - err := c.Delete(ctx, &instance) - require.NoError(t, err) - }) -} diff --git a/operator/kafkacontroller/delete.go b/operator/kafkacontroller/delete.go new file mode 100644 index 00000000..7567f691 --- /dev/null +++ b/operator/kafkacontroller/delete.go @@ -0,0 +1,34 @@ +package kafkacontroller + +import ( + "context" + "errors" + "fmt" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + + "github.com/crossplane/crossplane-runtime/pkg/resource" + exoscaleapi "github.com/exoscale/egoscale/v2/api" + controllerruntime "sigs.k8s.io/controller-runtime" +) + +// Delete idempotently deletes a kafka instance. +// It will not return a "not found" error. +func (c connection) Delete(ctx context.Context, mg resource.Managed) error { + log := controllerruntime.LoggerFrom(ctx) + log.V(1).Info("deleting resource") + + instance, ok := mg.(*exoscalev1.Kafka) + if !ok { + return fmt.Errorf("invalid managed resource type %T for kafka connection", mg) + } + resp, err := c.exo.DeleteDbaasServiceWithResponse(ctx, instance.GetInstanceName()) + if err != nil { + if errors.Is(err, exoscaleapi.ErrNotFound) { + return nil + } + return fmt.Errorf("cannot delete kafak instance: %w", err) + } + log.V(2).Info("response", "body", string(resp.Body)) + return nil +} diff --git a/operator/kafkacontroller/delete_test.go b/operator/kafkacontroller/delete_test.go new file mode 100644 index 00000000..6838b4ea --- /dev/null +++ b/operator/kafkacontroller/delete_test.go @@ -0,0 +1,71 @@ +package kafkacontroller + +import ( + "context" + "testing" + + exoscaleapi "github.com/exoscale/egoscale/v2/api" + "github.com/exoscale/egoscale/v2/oapi" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/internal/operatortest" +) + +func TestDelete(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := exoscalev1.Kafka{ + ObjectMeta: metav1.ObjectMeta{ + Name: "buzz", + }, + } + ctx := context.Background() + + exoMock.On("DeleteDbaasServiceWithResponse", mock.Anything, "buzz"). + Return(&oapi.DeleteDbaasServiceResponse{Body: []byte{}}, nil). + Once() + + assert.NotPanics(t, func() { + err := c.Delete(ctx, &instance) + require.NoError(t, err) + }) +} +func TestDelete_invalidInput(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + ctx := context.Background() + assert.NotPanics(t, func() { + err := c.Delete(ctx, nil) + assert.Error(t, err) + }) +} +func TestDelete_Idempotent(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := exoscalev1.Kafka{ + ObjectMeta: metav1.ObjectMeta{ + Name: "buzz", + }, + } + ctx := context.Background() + + exoMock.On("DeleteDbaasServiceWithResponse", mock.Anything, "buzz"). + Return(nil, exoscaleapi.ErrNotFound). + Once() + + assert.NotPanics(t, func() { + err := c.Delete(ctx, &instance) + require.NoError(t, err) + }) +} diff --git a/operator/kafkacontroller/update.go b/operator/kafkacontroller/update.go new file mode 100644 index 00000000..c9288dfa --- /dev/null +++ b/operator/kafkacontroller/update.go @@ -0,0 +1,53 @@ +package kafkacontroller + +import ( + "context" + "fmt" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/operator/mapper" + + "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" + "github.com/crossplane/crossplane-runtime/pkg/resource" + "github.com/exoscale/egoscale/v2/oapi" + controllerruntime "sigs.k8s.io/controller-runtime" +) + +// Update the provided kafka instance. +func (c connection) Update(ctx context.Context, mg resource.Managed) (managed.ExternalUpdate, error) { + log := controllerruntime.LoggerFrom(ctx) + log.V(1).Info("updating resource") + + instance, ok := mg.(*exoscalev1.Kafka) + if !ok { + return managed.ExternalUpdate{}, fmt.Errorf("invalid managed resource type %T for kafka connection", mg) + } + + spec := instance.Spec.ForProvider + ipFilter := []string(spec.IPFilter) + settings, err := mapper.ToMap(spec.KafkaSettings) + if err != nil { + return managed.ExternalUpdate{}, fmt.Errorf("invalid kafka settings: %w", err) + } + + body := oapi.UpdateDbaasServiceKafkaJSONRequestBody{ + IpFilter: &ipFilter, + KafkaSettings: &settings, + Maintenance: &struct { + Dow oapi.UpdateDbaasServiceKafkaJSONBodyMaintenanceDow "json:\"dow\"" + Time string "json:\"time\"" + }{ + Dow: oapi.UpdateDbaasServiceKafkaJSONBodyMaintenanceDow(spec.Maintenance.DayOfWeek), + Time: spec.Maintenance.TimeOfDay.String(), + }, + Plan: &spec.Size.Plan, + TerminationProtection: &spec.TerminationProtection, + } + + resp, err := c.exo.UpdateDbaasServiceKafkaWithResponse(ctx, oapi.DbaasServiceName(instance.GetInstanceName()), body) + if err != nil { + return managed.ExternalUpdate{}, fmt.Errorf("unable to update instance: %w", err) + } + log.V(2).Info("response", "body", string(resp.Body)) + return managed.ExternalUpdate{}, nil +} diff --git a/operator/kafkacontroller/update_test.go b/operator/kafkacontroller/update_test.go new file mode 100644 index 00000000..7b6305d6 --- /dev/null +++ b/operator/kafkacontroller/update_test.go @@ -0,0 +1,62 @@ +package kafkacontroller + +import ( + "context" + "testing" + + "github.com/exoscale/egoscale/v2/oapi" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" + "github.com/vshn/provider-exoscale/internal/operatortest" +) + +func TestUpdate(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := exoscalev1.Kafka{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bar", + }, + } + instance.Spec.ForProvider.Size.Plan = "businesss-4" + instance.Spec.ForProvider.IPFilter = []string{ + "1.0.0.0/8", + "2.0.0.0/8", + } + instance.Spec.ForProvider.Maintenance.DayOfWeek = "monday" + instance.Spec.ForProvider.Maintenance.TimeOfDay = "11:11:11" + ctx := context.Background() + + exoMock.On("UpdateDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("bar"), + mock.MatchedBy(func(req oapi.UpdateDbaasServiceKafkaJSONRequestBody) bool { + return req.IpFilter != nil && len(*req.IpFilter) == 2 && (*req.IpFilter)[0] == "1.0.0.0/8" && + req.Plan != nil && *req.Plan == "businesss-4" && + req.Maintenance != nil && req.Maintenance.Dow == "monday" && req.Maintenance.Time == "11:11:11" + })). + Return(&oapi.UpdateDbaasServiceKafkaResponse{Body: []byte{}}, nil). + Once() + + assert.NotPanics(t, func() { + _, err := c.Update(ctx, &instance) + require.NoError(t, err) + }) +} + +func TestUpdate_invalidInput(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + ctx := context.Background() + assert.NotPanics(t, func() { + _, err := c.Update(ctx, nil) + assert.Error(t, err) + }) +} From 22265e628ab1e5762e1f1f680ba3aca869a93109 Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Tue, 29 Nov 2022 18:07:46 +0100 Subject: [PATCH 15/21] Use plural for kafka webhook name --- apis/exoscale/v1/kafka_types.go | 2 +- package/webhook/manifests.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apis/exoscale/v1/kafka_types.go b/apis/exoscale/v1/kafka_types.go index a550e20f..c93df173 100644 --- a/apis/exoscale/v1/kafka_types.go +++ b/apis/exoscale/v1/kafka_types.go @@ -59,7 +59,7 @@ type KafkaStatus struct { // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" // +kubebuilder:subresource:status // +kubebuilder:resource:scope=Cluster,categories={crossplane,exoscale} -// +kubebuilder:webhook:verbs=create;update,path=/validate-exoscale-crossplane-io-v1-kafka,mutating=false,failurePolicy=fail,groups=exoscale.crossplane.io,resources=kafkas,versions=v1,name=kafka.exoscale.crossplane.io,sideEffects=None,admissionReviewVersions=v1 +// +kubebuilder:webhook:verbs=create;update,path=/validate-exoscale-crossplane-io-v1-kafka,mutating=false,failurePolicy=fail,groups=exoscale.crossplane.io,resources=kafkas,versions=v1,name=kafkas.exoscale.crossplane.io,sideEffects=None,admissionReviewVersions=v1 // Kafka is the API for creating Kafka. type Kafka struct { diff --git a/package/webhook/manifests.yaml b/package/webhook/manifests.yaml index ee9b5b45..ad9e86c8 100644 --- a/package/webhook/manifests.yaml +++ b/package/webhook/manifests.yaml @@ -53,7 +53,7 @@ webhooks: namespace: system path: /validate-exoscale-crossplane-io-v1-kafka failurePolicy: Fail - name: kafka.exoscale.crossplane.io + name: kafkas.exoscale.crossplane.io rules: - apiGroups: - exoscale.crossplane.io From 485afb7fd1a53e2a55e77f1a0e737da250333aec Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Tue, 29 Nov 2022 18:51:23 +0100 Subject: [PATCH 16/21] Add option to set version for kafka on creation --- apis/exoscale/v1/kafka_types.go | 3 ++ operator/kafkacontroller/create.go | 2 ++ operator/kafkacontroller/observe.go | 1 + operator/kafkacontroller/observe_test.go | 36 +++++++++++++++++++ operator/kafkacontroller/webhook.go | 24 +++++++++++-- operator/kafkacontroller/webhook_test.go | 20 +++++++++++ .../crds/exoscale.crossplane.io_kafkas.yaml | 4 +++ 7 files changed, 87 insertions(+), 3 deletions(-) diff --git a/apis/exoscale/v1/kafka_types.go b/apis/exoscale/v1/kafka_types.go index c93df173..dbcca07a 100644 --- a/apis/exoscale/v1/kafka_types.go +++ b/apis/exoscale/v1/kafka_types.go @@ -21,6 +21,9 @@ type KafkaParameters struct { DBaaSParameters `json:",inline"` + // Version is the (minor) version identifier for the instance (e.g. "3.2"). + Version string `json:"version,omitempty"` + // KafkaSettings contains additional Kafka settings. KafkaSettings runtime.RawExtension `json:"kafkaSettings,omitempty"` } diff --git a/operator/kafkacontroller/create.go b/operator/kafkacontroller/create.go index b8358387..598a0026 100644 --- a/operator/kafkacontroller/create.go +++ b/operator/kafkacontroller/create.go @@ -8,6 +8,7 @@ import ( exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" "github.com/vshn/provider-exoscale/operator/mapper" + "k8s.io/utils/pointer" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" @@ -45,6 +46,7 @@ func (c connection) Create(ctx context.Context, mg resource.Managed) (managed.Ex Time: spec.Maintenance.TimeOfDay.String(), }, Plan: spec.Size.Plan, + Version: pointer.String(spec.Version), TerminationProtection: &spec.TerminationProtection, } diff --git a/operator/kafkacontroller/observe.go b/operator/kafkacontroller/observe.go index 0897140a..9a3c9878 100644 --- a/operator/kafkacontroller/observe.go +++ b/operator/kafkacontroller/observe.go @@ -181,6 +181,7 @@ func diffParameters(external *oapi.DbaasServiceKafka, expected exoscalev1.KafkaP }, IPFilter: actualIPFilter, }, + Version: expected.Version, // We should never mark somthing as out of date if the versions don't match as update can't modify the version anyway KafkaSettings: actualKafkaSettings, } diff --git a/operator/kafkacontroller/observe_test.go b/operator/kafkacontroller/observe_test.go index 8ab12e8a..330d3370 100644 --- a/operator/kafkacontroller/observe_test.go +++ b/operator/kafkacontroller/observe_test.go @@ -230,6 +230,42 @@ func TestObserve_UpToDate_Condition_Ready(t *testing.T) { }) } +func TestObserve_UpToDate_WithVersion(t *testing.T) { + exoMock := &operatortest.ClientWithResponsesInterface{} + c := connection{ + exo: exoMock, + } + instance := sampleKafka("foo") + instance.Spec.ForProvider.Version = "3.2" + found := sampleAPIKafka("foo") + found.Version = pointer.String("3.2.1") + + ctx := context.Background() + exoMock.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo")). + Return(&oapi.GetDbaasServiceKafkaResponse{ + Body: []byte{}, + JSON200: found, + }, nil). + Once() + exoMock.On("GetDbaasCaCertificateWithResponse", mock.Anything). + Return(&oapi.GetDbaasCaCertificateResponse{ + JSON200: &struct { + Certificate *string "json:\"certificate,omitempty\"" + }{ + Certificate: pointer.String("CA"), + }, + }, nil). + Once() + + assert.NotPanics(t, func() { + res, err := c.Observe(ctx, &instance) + assert.NoError(t, err) + require.NotNil(t, res) + assert.True(t, res.ResourceExists, "report resource exits") + assert.True(t, res.ResourceUpToDate, "report resource uptodate") + }) +} + func TestObserve_Outdated(t *testing.T) { exoMock := &operatortest.ClientWithResponsesInterface{} c := connection{ diff --git a/operator/kafkacontroller/webhook.go b/operator/kafkacontroller/webhook.go index d05d4cf5..b99076e6 100644 --- a/operator/kafkacontroller/webhook.go +++ b/operator/kafkacontroller/webhook.go @@ -56,7 +56,7 @@ func (v *Validator) ValidateUpdate(_ context.Context, oldObj, newObj runtime.Obj if err != nil { return err } - return validateImmutable(oldInstance.Spec.ForProvider, newInstance.Spec.ForProvider) + return validateImmutable(*oldInstance, *newInstance) } // ValidateDelete validates a delete. Currently does not validate anything. @@ -89,8 +89,11 @@ func validateKafkaSettings(obj exoscalev1.KafkaParameters) error { return webhook.ValidateRawExtension(obj.KafkaSettings) } -func validateImmutable(oldParams, newParams exoscalev1.KafkaParameters) error { - return compareZone(oldParams, newParams) +func validateImmutable(oldInst, newInst exoscalev1.Kafka) error { + return multierr.Combine( + compareZone(oldInst.Spec.ForProvider, newInst.Spec.ForProvider), + compareVersion(oldInst, newInst), + ) } func compareZone(oldParams, newParams exoscalev1.KafkaParameters) error { @@ -99,3 +102,18 @@ func compareZone(oldParams, newParams exoscalev1.KafkaParameters) error { } return nil } + +func compareVersion(oldInst, newInst exoscalev1.Kafka) error { + if oldInst.Spec.ForProvider.Version == newInst.Spec.ForProvider.Version { + return nil + } + if newInst.Spec.ForProvider.Version == "" { + // Setting version to empyt string should always be fine + return nil + } + if oldInst.Spec.ForProvider.Version == "" { + // Fall back to reported version if no version was set before + oldInst.Spec.ForProvider.Version = oldInst.Status.AtProvider.Version + } + return webhook.ValidateVersion(oldInst.Status.AtProvider.Version, oldInst.Spec.ForProvider.Version, newInst.Spec.ForProvider.Version) +} diff --git a/operator/kafkacontroller/webhook_test.go b/operator/kafkacontroller/webhook_test.go index 382154a4..86ff4cc4 100644 --- a/operator/kafkacontroller/webhook_test.go +++ b/operator/kafkacontroller/webhook_test.go @@ -69,5 +69,25 @@ func TestWebhook_Update(t *testing.T) { err := v.ValidateUpdate(ctx, &base, &inst) assert.Error(t, err) }) + t.Run("change unsupported version", func(t *testing.T) { + newInst := base + oldInst := base + + oldInst.Status.AtProvider.Version = "3.2.1" + newInst.Spec.ForProvider.Version = "3.3" + + err := v.ValidateUpdate(ctx, &oldInst, &newInst) + assert.Error(t, err) + }) + t.Run("change supported version", func(t *testing.T) { + newInst := base + oldInst := base + + oldInst.Status.AtProvider.Version = "3.2.1" + newInst.Spec.ForProvider.Version = "3.2" + + err := v.ValidateUpdate(ctx, &oldInst, &newInst) + assert.NoError(t, err) + }) } diff --git a/package/crds/exoscale.crossplane.io_kafkas.yaml b/package/crds/exoscale.crossplane.io_kafkas.yaml index 5155ea64..8bfd1081 100644 --- a/package/crds/exoscale.crossplane.io_kafkas.yaml +++ b/package/crds/exoscale.crossplane.io_kafkas.yaml @@ -116,6 +116,10 @@ spec: description: TerminationProtection protects against termination and powering off. type: boolean + version: + description: Version is the (minor) version identifier for the + instance (e.g. "3.2"). + type: string zone: description: Zone is the datacenter identifier in which the instance runs in. From 0f7a2e71b8729341cd7b80300487cb97558660fa Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Tue, 29 Nov 2022 19:22:14 +0100 Subject: [PATCH 17/21] Cleanup kakfa unit tests a little --- operator/kafkacontroller/create_test.go | 40 ++++-- operator/kafkacontroller/delete_test.go | 20 +-- operator/kafkacontroller/observe_test.go | 167 +++++++---------------- operator/kafkacontroller/update_test.go | 39 ++++-- 4 files changed, 116 insertions(+), 150 deletions(-) diff --git a/operator/kafkacontroller/create_test.go b/operator/kafkacontroller/create_test.go index da8eef39..d8dac088 100644 --- a/operator/kafkacontroller/create_test.go +++ b/operator/kafkacontroller/create_test.go @@ -35,19 +35,23 @@ func TestCreate(t *testing.T) { instance.Spec.ForProvider.Maintenance.TimeOfDay = "10:10:10" ctx := context.Background() - exoMock.On("CreateDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo"), - mock.MatchedBy(func(req oapi.CreateDbaasServiceKafkaJSONRequestBody) bool { - return req.IpFilter != nil && len(*req.IpFilter) == 1 && (*req.IpFilter)[0] == "0.0.0.0/0" && - req.Plan == "businesss-8" && - req.Maintenance != nil && req.Maintenance.Dow == "monday" && req.Maintenance.Time == "10:10:10" - })). - Return(&oapi.CreateDbaasServiceKafkaResponse{Body: []byte{}}, nil). - Once() + createReq := mockCreateKafkaCall(exoMock, "foo", nil) assert.NotPanics(t, func() { _, err := c.Create(ctx, &instance) require.NoError(t, err) }) + if assert.NotNil(t, createReq.IpFilter) { + assert.Len(t, *createReq.IpFilter, 1) + assert.Equal(t, (*createReq.IpFilter)[0], "0.0.0.0/0") + } + if assert.NotNil(t, createReq.Plan) { + assert.Equal(t, createReq.Plan, "businesss-8") + } + if assert.NotNil(t, createReq.Maintenance) { + assert.EqualValues(t, createReq.Maintenance.Dow, "monday") + assert.Equal(t, createReq.Maintenance.Time, "10:10:10") + } } func TestCreate_Idempotent(t *testing.T) { @@ -66,13 +70,11 @@ func TestCreate_Idempotent(t *testing.T) { } instance.Spec.ForProvider.Maintenance.DayOfWeek = "monday" instance.Spec.ForProvider.Maintenance.TimeOfDay = "10:10:10" - ctx := context.Background() - exoMock.On("CreateDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo"), mock.Anything). - Return(nil, fmt.Errorf("%w: Service name is already taken.", exoscaleapi.ErrInvalidRequest)). - Once() + _ = mockCreateKafkaCall(exoMock, "foo", fmt.Errorf("%w: Service name is already taken.", exoscaleapi.ErrInvalidRequest)) assert.NotPanics(t, func() { + ctx := context.Background() _, err := c.Create(ctx, &instance) require.NoError(t, err) }) @@ -89,3 +91,17 @@ func TestCreate_invalidInput(t *testing.T) { assert.Error(t, err) }) } + +func mockCreateKafkaCall(m *operatortest.ClientWithResponsesInterface, name string, err error) *oapi.CreateDbaasServiceKafkaJSONRequestBody { + createReq := &oapi.CreateDbaasServiceKafkaJSONRequestBody{} + + m.On("CreateDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName(name), + mock.MatchedBy(func(req oapi.CreateDbaasServiceKafkaJSONRequestBody) bool { + *createReq = req + return true + })). + Return(&oapi.CreateDbaasServiceKafkaResponse{Body: []byte{}}, err). + Once() + + return createReq +} diff --git a/operator/kafkacontroller/delete_test.go b/operator/kafkacontroller/delete_test.go index 6838b4ea..cb90a9b8 100644 --- a/operator/kafkacontroller/delete_test.go +++ b/operator/kafkacontroller/delete_test.go @@ -26,13 +26,11 @@ func TestDelete(t *testing.T) { Name: "buzz", }, } - ctx := context.Background() - exoMock.On("DeleteDbaasServiceWithResponse", mock.Anything, "buzz"). - Return(&oapi.DeleteDbaasServiceResponse{Body: []byte{}}, nil). - Once() + mockDeleteKafkaCall(exoMock, "buzz", nil) assert.NotPanics(t, func() { + ctx := context.Background() err := c.Delete(ctx, &instance) require.NoError(t, err) }) @@ -42,8 +40,8 @@ func TestDelete_invalidInput(t *testing.T) { c := connection{ exo: exoMock, } - ctx := context.Background() assert.NotPanics(t, func() { + ctx := context.Background() err := c.Delete(ctx, nil) assert.Error(t, err) }) @@ -58,14 +56,18 @@ func TestDelete_Idempotent(t *testing.T) { Name: "buzz", }, } - ctx := context.Background() - exoMock.On("DeleteDbaasServiceWithResponse", mock.Anything, "buzz"). - Return(nil, exoscaleapi.ErrNotFound). - Once() + mockDeleteKafkaCall(exoMock, "buzz", exoscaleapi.ErrNotFound) assert.NotPanics(t, func() { + ctx := context.Background() err := c.Delete(ctx, &instance) require.NoError(t, err) }) } + +func mockDeleteKafkaCall(m *operatortest.ClientWithResponsesInterface, name string, err error) { + m.On("DeleteDbaasServiceWithResponse", mock.Anything, name). + Return(&oapi.DeleteDbaasServiceResponse{Body: []byte{}}, err). + Once() +} diff --git a/operator/kafkacontroller/observe_test.go b/operator/kafkacontroller/observe_test.go index 330d3370..e7d30016 100644 --- a/operator/kafkacontroller/observe_test.go +++ b/operator/kafkacontroller/observe_test.go @@ -24,15 +24,12 @@ func TestObserve_NotExits(t *testing.T) { c := connection{ exo: exoMock, } - instance := sampleKafka("foo") - - ctx := context.Background() - exoMock.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo")). - Return(&oapi.GetDbaasServiceKafkaResponse{Body: []byte{}}, exoscaleapi.ErrNotFound). - Once() + instance := sampleKafka("foo") + mockGetKafkaCall(exoMock, "foo", nil, exoscaleapi.ErrNotFound) assert.NotPanics(t, func() { + ctx := context.Background() res, err := c.Observe(ctx, &instance) assert.NoError(t, err) require.NotNil(t, res) @@ -45,6 +42,7 @@ func TestObserve_UpToDate_ConnectionDetails(t *testing.T) { c := connection{ exo: exoMock, } + instance := sampleKafka("foo") found := sampleAPIKafka("foo") found.Uri = pointer.String("foobar.com:21701") @@ -60,26 +58,13 @@ func TestObserve_UpToDate_ConnectionDetails(t *testing.T) { found.ConnectionInfo.AccessCert = pointer.String("CERT") found.ConnectionInfo.AccessKey = pointer.String("KEY") - ctx := context.Background() - - exoMock.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo")). - Return(&oapi.GetDbaasServiceKafkaResponse{ - Body: []byte{}, - JSON200: found, - }, nil). - Once() - exoMock.On("GetDbaasCaCertificateWithResponse", mock.Anything). - Return(&oapi.GetDbaasCaCertificateResponse{ - JSON200: &struct { - Certificate *string "json:\"certificate,omitempty\"" - }{ - Certificate: pointer.String("CA"), - }, - }, nil). - Once() + mockGetKafkaCall(exoMock, "foo", found, nil) + mockCACall(exoMock) assert.NotPanics(t, func() { + ctx := context.Background() res, err := c.Observe(ctx, &instance) + assert.NoError(t, err) require.NotNil(t, res) assert.True(t, res.ResourceExists, "report resource exits") @@ -117,25 +102,13 @@ func TestObserve_UpToDate_Status(t *testing.T) { }, } - ctx := context.Background() - exoMock.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo")). - Return(&oapi.GetDbaasServiceKafkaResponse{ - Body: []byte{}, - JSON200: found, - }, nil). - Once() - exoMock.On("GetDbaasCaCertificateWithResponse", mock.Anything). - Return(&oapi.GetDbaasCaCertificateResponse{ - JSON200: &struct { - Certificate *string "json:\"certificate,omitempty\"" - }{ - Certificate: pointer.String("CA"), - }, - }, nil). - Once() + mockGetKafkaCall(exoMock, "foo", found, nil) + mockCACall(exoMock) assert.NotPanics(t, func() { + ctx := context.Background() res, err := c.Observe(ctx, &instance) + assert.NoError(t, err) require.NotNil(t, res) assert.True(t, res.ResourceExists, "report resource exits") @@ -159,24 +132,11 @@ func TestObserve_UpToDate_Condition_NotReady(t *testing.T) { state := oapi.EnumServiceStateRebalancing found.State = &state - ctx := context.Background() - exoMock.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo")). - Return(&oapi.GetDbaasServiceKafkaResponse{ - Body: []byte{}, - JSON200: found, - }, nil). - Once() - exoMock.On("GetDbaasCaCertificateWithResponse", mock.Anything). - Return(&oapi.GetDbaasCaCertificateResponse{ - JSON200: &struct { - Certificate *string "json:\"certificate,omitempty\"" - }{ - Certificate: pointer.String("CA"), - }, - }, nil). - Once() + mockGetKafkaCall(exoMock, "foo", found, nil) + mockCACall(exoMock) assert.NotPanics(t, func() { + ctx := context.Background() res, err := c.Observe(ctx, &instance) assert.NoError(t, err) require.NotNil(t, res) @@ -200,24 +160,11 @@ func TestObserve_UpToDate_Condition_Ready(t *testing.T) { state := oapi.EnumServiceStateRunning found.State = &state - ctx := context.Background() - exoMock.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo")). - Return(&oapi.GetDbaasServiceKafkaResponse{ - Body: []byte{}, - JSON200: found, - }, nil). - Once() - exoMock.On("GetDbaasCaCertificateWithResponse", mock.Anything). - Return(&oapi.GetDbaasCaCertificateResponse{ - JSON200: &struct { - Certificate *string "json:\"certificate,omitempty\"" - }{ - Certificate: pointer.String("CA"), - }, - }, nil). - Once() + mockGetKafkaCall(exoMock, "foo", found, nil) + mockCACall(exoMock) assert.NotPanics(t, func() { + ctx := context.Background() res, err := c.Observe(ctx, &instance) assert.NoError(t, err) require.NotNil(t, res) @@ -240,24 +187,11 @@ func TestObserve_UpToDate_WithVersion(t *testing.T) { found := sampleAPIKafka("foo") found.Version = pointer.String("3.2.1") - ctx := context.Background() - exoMock.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo")). - Return(&oapi.GetDbaasServiceKafkaResponse{ - Body: []byte{}, - JSON200: found, - }, nil). - Once() - exoMock.On("GetDbaasCaCertificateWithResponse", mock.Anything). - Return(&oapi.GetDbaasCaCertificateResponse{ - JSON200: &struct { - Certificate *string "json:\"certificate,omitempty\"" - }{ - Certificate: pointer.String("CA"), - }, - }, nil). - Once() + mockGetKafkaCall(exoMock, "foo", found, nil) + mockCACall(exoMock) assert.NotPanics(t, func() { + ctx := context.Background() res, err := c.Observe(ctx, &instance) assert.NoError(t, err) require.NotNil(t, res) @@ -275,24 +209,11 @@ func TestObserve_Outdated(t *testing.T) { found := sampleAPIKafka("foo") found.Maintenance.Dow = "tuesday" - ctx := context.Background() - exoMock.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo")). - Return(&oapi.GetDbaasServiceKafkaResponse{ - Body: []byte{}, - JSON200: found, - }, nil). - Once() - exoMock.On("GetDbaasCaCertificateWithResponse", mock.Anything). - Return(&oapi.GetDbaasCaCertificateResponse{ - JSON200: &struct { - Certificate *string "json:\"certificate,omitempty\"" - }{ - Certificate: pointer.String("CA"), - }, - }, nil). - Once() + mockGetKafkaCall(exoMock, "foo", found, nil) + mockCACall(exoMock) assert.NotPanics(t, func() { + ctx := context.Background() res, err := c.Observe(ctx, &instance) assert.NoError(t, err) require.NotNil(t, res) @@ -318,24 +239,11 @@ func TestObserve_Outdated_Settings(t *testing.T) { "count": 2, } - ctx := context.Background() - exoMock.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("foo")). - Return(&oapi.GetDbaasServiceKafkaResponse{ - Body: []byte{}, - JSON200: found, - }, nil). - Once() - exoMock.On("GetDbaasCaCertificateWithResponse", mock.Anything). - Return(&oapi.GetDbaasCaCertificateResponse{ - JSON200: &struct { - Certificate *string "json:\"certificate,omitempty\"" - }{ - Certificate: pointer.String("CA"), - }, - }, nil). - Once() + mockGetKafkaCall(exoMock, "foo", found, nil) + mockCACall(exoMock) assert.NotPanics(t, func() { + ctx := context.Background() res, err := c.Observe(ctx, &instance) assert.NoError(t, err) require.NotNil(t, res) @@ -394,3 +302,24 @@ func sampleAPIKafka(name string) *oapi.DbaasServiceKafka { return &res } + +func mockGetKafkaCall(m *operatortest.ClientWithResponsesInterface, name string, found *oapi.DbaasServiceKafka, err error) { + m.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName(name)). + Return(&oapi.GetDbaasServiceKafkaResponse{ + Body: []byte{}, + JSON200: found, + }, err). + Once() + +} +func mockCACall(m *operatortest.ClientWithResponsesInterface) { + m.On("GetDbaasCaCertificateWithResponse", mock.Anything). + Return(&oapi.GetDbaasCaCertificateResponse{ + JSON200: &struct { + Certificate *string "json:\"certificate,omitempty\"" + }{ + Certificate: pointer.String("CA"), + }, + }, nil). + Once() +} diff --git a/operator/kafkacontroller/update_test.go b/operator/kafkacontroller/update_test.go index 7b6305d6..2401f8f7 100644 --- a/operator/kafkacontroller/update_test.go +++ b/operator/kafkacontroller/update_test.go @@ -32,21 +32,26 @@ func TestUpdate(t *testing.T) { } instance.Spec.ForProvider.Maintenance.DayOfWeek = "monday" instance.Spec.ForProvider.Maintenance.TimeOfDay = "11:11:11" - ctx := context.Background() - exoMock.On("UpdateDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName("bar"), - mock.MatchedBy(func(req oapi.UpdateDbaasServiceKafkaJSONRequestBody) bool { - return req.IpFilter != nil && len(*req.IpFilter) == 2 && (*req.IpFilter)[0] == "1.0.0.0/8" && - req.Plan != nil && *req.Plan == "businesss-4" && - req.Maintenance != nil && req.Maintenance.Dow == "monday" && req.Maintenance.Time == "11:11:11" - })). - Return(&oapi.UpdateDbaasServiceKafkaResponse{Body: []byte{}}, nil). - Once() + updateReq := mockUpdateKafkaCall(exoMock, "bar", nil) assert.NotPanics(t, func() { + ctx := context.Background() _, err := c.Update(ctx, &instance) require.NoError(t, err) }) + + if assert.NotNil(t, updateReq.IpFilter) { + assert.Len(t, *updateReq.IpFilter, 2) + assert.Equal(t, (*updateReq.IpFilter)[0], "1.0.0.0/8") + } + if assert.NotNil(t, updateReq.Plan) { + assert.Equal(t, *updateReq.Plan, "businesss-4") + } + if assert.NotNil(t, updateReq.Maintenance) { + assert.EqualValues(t, updateReq.Maintenance.Dow, "monday") + assert.Equal(t, updateReq.Maintenance.Time, "11:11:11") + } } func TestUpdate_invalidInput(t *testing.T) { @@ -54,9 +59,23 @@ func TestUpdate_invalidInput(t *testing.T) { c := connection{ exo: exoMock, } - ctx := context.Background() assert.NotPanics(t, func() { + ctx := context.Background() _, err := c.Update(ctx, nil) assert.Error(t, err) }) } + +func mockUpdateKafkaCall(m *operatortest.ClientWithResponsesInterface, name string, err error) *oapi.UpdateDbaasServiceKafkaJSONRequestBody { + updateReq := &oapi.UpdateDbaasServiceKafkaJSONRequestBody{} + + m.On("UpdateDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName(name), + mock.MatchedBy(func(req oapi.UpdateDbaasServiceKafkaJSONRequestBody) bool { + *updateReq = req + return true + })). + Return(&oapi.UpdateDbaasServiceKafkaResponse{Body: []byte{}}, err). + Once() + + return updateReq +} From 55c05883510e75c38a75772fe2a6a801fbffefda Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Thu, 1 Dec 2022 15:41:06 +0100 Subject: [PATCH 18/21] Only set version set to a non empty string --- operator/kafkacontroller/create.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/operator/kafkacontroller/create.go b/operator/kafkacontroller/create.go index 598a0026..271059f8 100644 --- a/operator/kafkacontroller/create.go +++ b/operator/kafkacontroller/create.go @@ -8,7 +8,6 @@ import ( exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" "github.com/vshn/provider-exoscale/operator/mapper" - "k8s.io/utils/pointer" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/crossplane/crossplane-runtime/pkg/resource" @@ -34,6 +33,10 @@ func (c connection) Create(ctx context.Context, mg resource.Managed) (managed.Ex if err != nil { return managed.ExternalCreation{}, fmt.Errorf("invalid kafka settings: %w", err) } + var version *string + if spec.Version != "" { + version = &spec.Version + } body := oapi.CreateDbaasServiceKafkaJSONRequestBody{ IpFilter: &ipFilter, @@ -46,7 +49,7 @@ func (c connection) Create(ctx context.Context, mg resource.Managed) (managed.Ex Time: spec.Maintenance.TimeOfDay.String(), }, Plan: spec.Size.Plan, - Version: pointer.String(spec.Version), + Version: version, TerminationProtection: &spec.TerminationProtection, } From 48d5ba414843302bd0d1ec31e77f9269370d4ea3 Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Fri, 2 Dec 2022 17:20:45 +0100 Subject: [PATCH 19/21] Fix kafka and kafka REST setting mixup --- operator/kafkacontroller/observe.go | 2 +- operator/kafkacontroller/observe_test.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/operator/kafkacontroller/observe.go b/operator/kafkacontroller/observe.go index 9a3c9878..36bf8277 100644 --- a/operator/kafkacontroller/observe.go +++ b/operator/kafkacontroller/observe.go @@ -163,7 +163,7 @@ func diffParameters(external *oapi.DbaasServiceKafka, expected exoscalev1.KafkaP if external.IpFilter != nil { actualIPFilter = *external.IpFilter } - actualKafkaSettings, err := mapper.ToRawExtension(external.KafkaRestSettings) + actualKafkaSettings, err := mapper.ToRawExtension(external.KafkaSettings) if err != nil { return false, err.Error() } diff --git a/operator/kafkacontroller/observe_test.go b/operator/kafkacontroller/observe_test.go index e7d30016..b4798a8f 100644 --- a/operator/kafkacontroller/observe_test.go +++ b/operator/kafkacontroller/observe_test.go @@ -16,6 +16,7 @@ import ( "github.com/vshn/provider-exoscale/operator/mapper" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/utils/pointer" ) @@ -265,6 +266,7 @@ func sampleKafka(name string) exoscalev1.Kafka { instance.Spec.ForProvider.Maintenance.DayOfWeek = "monday" instance.Spec.ForProvider.Maintenance.TimeOfDay = "10:10:10" instance.Spec.ForProvider.Zone = "ch-dk-2" + instance.Spec.ForProvider.KafkaSettings = runtime.RawExtension{Raw: []byte(`{"connections_max_idle_ms":60000}`)} return instance } @@ -278,6 +280,9 @@ func sampleAPIKafka(name string) *oapi.DbaasServiceKafka { Dow: "monday", Time: "10:10:10", } + res.KafkaSettings = &map[string]interface{}{ + "connections_max_idle_ms": 60000, + } nodes := []string{"194.182.160.164:21701", "159.100.244.100:21701", From b312939e5ad356833f6bd07477a5562ff594c588 Mon Sep 17 00:00:00 2001 From: Fabian Fischer <10788152+glrf@users.noreply.github.com> Date: Mon, 5 Dec 2022 10:20:02 +0100 Subject: [PATCH 20/21] Fix typo and minor formatting improvements Co-authored-by: Michael Weibel <307427+mweibel@users.noreply.github.com> --- operator/kafkacontroller/delete.go | 2 +- operator/kafkacontroller/observe.go | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/operator/kafkacontroller/delete.go b/operator/kafkacontroller/delete.go index 7567f691..0b0349cc 100644 --- a/operator/kafkacontroller/delete.go +++ b/operator/kafkacontroller/delete.go @@ -27,7 +27,7 @@ func (c connection) Delete(ctx context.Context, mg resource.Managed) error { if errors.Is(err, exoscaleapi.ErrNotFound) { return nil } - return fmt.Errorf("cannot delete kafak instance: %w", err) + return fmt.Errorf("cannot delete kafka instance: %w", err) } log.V(2).Info("response", "body", string(resp.Body)) return nil diff --git a/operator/kafkacontroller/observe.go b/operator/kafkacontroller/observe.go index 36bf8277..11ae4a73 100644 --- a/operator/kafkacontroller/observe.go +++ b/operator/kafkacontroller/observe.go @@ -68,11 +68,10 @@ func (c connection) Observe(ctx context.Context, mg resource.Managed) (managed.E upToDate, diff := diffParameters(external, instance.Spec.ForProvider) return managed.ExternalObservation{ - ResourceExists: true, - ResourceUpToDate: upToDate, - ResourceLateInitialized: false, - ConnectionDetails: connDetails, - Diff: diff, + ResourceExists: true, + ResourceUpToDate: upToDate, + ConnectionDetails: connDetails, + Diff: diff, }, nil } From 69b85dccc8465256423cbaeaecff8fa1fced0c9e Mon Sep 17 00:00:00 2001 From: Fabian Fischer Date: Mon, 5 Dec 2022 11:35:04 +0100 Subject: [PATCH 21/21] Remove multierr and make kafka webhook work the same as other webhooks --- go.mod | 2 +- operator/kafkacontroller/webhook.go | 24 ++++++++++++++---------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index fcf3675d..154256a7 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/exoscale/egoscale v0.90.1 github.com/go-logr/logr v1.2.3 github.com/go-logr/zapr v1.2.3 + github.com/google/go-cmp v0.5.8 github.com/hashicorp/go-version v1.6.0 github.com/minio/minio-go/v7 v7.0.43 github.com/stretchr/testify v1.8.0 @@ -46,7 +47,6 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/gnostic v0.6.9 // indirect - github.com/google/go-cmp v0.5.8 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.3.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect diff --git a/operator/kafkacontroller/webhook.go b/operator/kafkacontroller/webhook.go index b99076e6..f94aefad 100644 --- a/operator/kafkacontroller/webhook.go +++ b/operator/kafkacontroller/webhook.go @@ -7,7 +7,6 @@ import ( exoscalev1 "github.com/vshn/provider-exoscale/apis/exoscale/v1" "github.com/vshn/provider-exoscale/operator/webhook" - "go.uber.org/multierr" ctrl "sigs.k8s.io/controller-runtime" "github.com/go-logr/logr" @@ -66,11 +65,15 @@ func (v *Validator) ValidateDelete(_ context.Context, obj runtime.Object) error } func validateSpec(params exoscalev1.KafkaParameters) error { - return multierr.Combine( - validateIpFilter(params), - validateMaintenanceSchedule(params), - validateKafkaSettings(params), - ) + err := validateIpFilter(params) + if err != nil { + return err + } + err = validateMaintenanceSchedule(params) + if err != nil { + return err + } + return validateKafkaSettings(params) } func validateIpFilter(params exoscalev1.KafkaParameters) error { @@ -90,10 +93,11 @@ func validateKafkaSettings(obj exoscalev1.KafkaParameters) error { } func validateImmutable(oldInst, newInst exoscalev1.Kafka) error { - return multierr.Combine( - compareZone(oldInst.Spec.ForProvider, newInst.Spec.ForProvider), - compareVersion(oldInst, newInst), - ) + err := compareZone(oldInst.Spec.ForProvider, newInst.Spec.ForProvider) + if err != nil { + return err + } + return compareVersion(oldInst, newInst) } func compareZone(oldParams, newParams exoscalev1.KafkaParameters) error {