From b09f4bef291ad38099d6c068fb50e64bff702eff Mon Sep 17 00:00:00 2001 From: Bohdan Siryk Date: Fri, 21 Jul 2023 15:46:23 +0300 Subject: [PATCH] issue-488, saving default kafka connect credentials was implemented --- apis/clusters/v1beta1/kafkaconnect_types.go | 22 ++++++++++ .../clusters/kafkaconnect_controller.go | 44 +++++++++++++++++++ pkg/instaclustr/client.go | 32 ++++++++++++++ pkg/instaclustr/interfaces.go | 1 + pkg/instaclustr/mock/client.go | 4 ++ 5 files changed, 103 insertions(+) diff --git a/apis/clusters/v1beta1/kafkaconnect_types.go b/apis/clusters/v1beta1/kafkaconnect_types.go index 2352a76c8..aa2659fe9 100644 --- a/apis/clusters/v1beta1/kafkaconnect_types.go +++ b/apis/clusters/v1beta1/kafkaconnect_types.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -687,3 +688,24 @@ func (tc *TargetCluster) ManagedClustersToInstAPI() (iClusters []*models.Managed } return } + +func (k *KafkaConnect) NewDefaultUserSecret(username, password string) *v1.Secret { + return &v1.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: models.SecretKind, + APIVersion: models.K8sAPIVersionV1, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf(models.DefaultUserSecretNameTemplate, models.DefaultUserSecretPrefix, k.Name), + Namespace: k.Namespace, + Labels: map[string]string{ + models.ControlledByLabel: k.Name, + models.DefaultSecretLabel: "true", + }, + }, + StringData: map[string]string{ + models.Username: username, + models.Password: password, + }, + } +} diff --git a/controllers/clusters/kafkaconnect_controller.go b/controllers/clusters/kafkaconnect_controller.go index 1cd49ded4..3af39fb4b 100644 --- a/controllers/clusters/kafkaconnect_controller.go +++ b/controllers/clusters/kafkaconnect_controller.go @@ -159,6 +159,21 @@ func (r *KafkaConnectReconciler) handleCreateCluster(ctx context.Context, kc *v1 "Cluster status check job is started", ) + err = r.createDefaultSecret(ctx, kc, l) + if err != nil { + l.Error(err, "Cannot create default secret for Kafka Connect", + "cluster name", kc.Spec.Name, + "clusterID", kc.Status.ID, + ) + r.EventRecorder.Eventf( + kc, models.Warning, models.CreationFailed, + "Default user secret creation on the Instaclustr is failed. Reason: %v", + err, + ) + + return models.ReconcileRequeue + } + return models.ExitReconcile } @@ -386,6 +401,35 @@ func (r *KafkaConnectReconciler) handleDeleteCluster(ctx context.Context, kc *v1 return models.ExitReconcile } +func (r *KafkaConnectReconciler) createDefaultSecret(ctx context.Context, kc *v1beta1.KafkaConnect, l logr.Logger) error { + username, password, err := r.API.GetDefaultCredentialsV1(kc.Status.ID) + if err != nil { + l.Error(err, "Cannot get default user creds for Kafka Connect cluster from the Instaclustr API", + "cluster ID", kc.Status.ID, + ) + r.EventRecorder.Eventf(kc, models.Warning, models.FetchFailed, + "Default user password fetch from the Instaclustr API is failed. Reason: %v", err, + ) + + return err + } + + secret := kc.NewDefaultUserSecret(username, password) + err = r.Create(ctx, secret) + if err != nil { + l.Error(err, "Cannot create secret with default user credentials", + "cluster ID", kc.Status.ID, + ) + r.EventRecorder.Eventf(kc, models.Warning, models.CreationFailed, + "Creating secret with default user credentials is failed. Reason: %v", err, + ) + + return err + } + + return nil +} + func (r *KafkaConnectReconciler) startClusterStatusJob(kc *v1beta1.KafkaConnect) error { job := r.newWatchStatusJob(kc) diff --git a/pkg/instaclustr/client.go b/pkg/instaclustr/client.go index 79ff1927f..ae38a2eb0 100644 --- a/pkg/instaclustr/client.go +++ b/pkg/instaclustr/client.go @@ -2184,3 +2184,35 @@ func (c *Client) DeleteUser(username, clusterID, app string) error { return nil } + +func (c *Client) GetDefaultCredentialsV1(clusterID string) (string, string, error) { + url := c.serverHostname + ClustersEndpointV1 + clusterID + + resp, err := c.DoRequest(url, http.MethodGet, nil) + if err != nil { + return "", "", err + } + defer resp.Body.Close() + + b, err := io.ReadAll(resp.Body) + if err != nil { + return "", "", err + } + + if resp.StatusCode != http.StatusAccepted { + return "", "", fmt.Errorf("status code: %d, message: %s", resp.StatusCode, b) + } + + type credentials struct { + Username string `json:"username"` + InstaclustrUserPassword string `json:"instaclustrUserPassword"` + } + + var creds credentials + err = json.Unmarshal(b, &creds) + if err != nil { + return "", "", err + } + + return creds.Username, creds.InstaclustrUserPassword, nil +} diff --git a/pkg/instaclustr/interfaces.go b/pkg/instaclustr/interfaces.go index f92a7bd82..19cb6f40a 100644 --- a/pkg/instaclustr/interfaces.go +++ b/pkg/instaclustr/interfaces.go @@ -96,4 +96,5 @@ type API interface { CreateUser(userSpec any, clusterID, app string) error DeleteUser(username, clusterID, app string) error ListAppVersions(app string) ([]*models.AppVersions, error) + GetDefaultCredentialsV1(clusterID string) (string, string, error) } diff --git a/pkg/instaclustr/mock/client.go b/pkg/instaclustr/mock/client.go index 660b2a674..b9df227f1 100644 --- a/pkg/instaclustr/mock/client.go +++ b/pkg/instaclustr/mock/client.go @@ -343,3 +343,7 @@ func (c *mockClient) CreateUser(userSpec any, clusterID, app string) error { func (c *mockClient) DeleteUser(username, clusterID, app string) error { panic("DeleteUser: is not implemented") } + +func (c *mockClient) GetDefaultCredentialsV1(clusterID string) (string, string, error) { + panic("GetDefaultCredentialsV1: is not implemented") +}