diff --git a/api/v1alpha1/zookeepercluster_types.go b/api/v1alpha1/zookeepercluster_types.go index 5595d0b..0a04e9d 100644 --- a/api/v1alpha1/zookeepercluster_types.go +++ b/api/v1alpha1/zookeepercluster_types.go @@ -35,6 +35,15 @@ type ZookeeperClusterSpec struct { Config map[string]int `json:"config,omitempty"` } +// ServerState is the server state of cluster, which takes from Zookeeper AdminServer +type ServerState struct { + Address string `json:"address"` + + PacketsSent int `json:"packets_sent"` + + PacketsReceived int `json:"packets_received"` +} + // ZookeeperClusterStatus defines the observed state of ZookeeperCluster type ZookeeperClusterStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster @@ -44,19 +53,19 @@ type ZookeeperClusterStatus struct { Replicas int32 `json:"replicas,omitempty"` // ReadyReplicas is the actual replicas count in the cluster - ReadyReplicas int32 `json:"readyReplicas,omitempty"` + ReadyReplicas int32 `json:"readyReplicas"` - // Address is the exposed service endpoint of the cluster - Address string `json:"address,omitempty"` + // Endpoint is the exposed service endpoint of the cluster + Endpoint string `json:"endpoint,omitempty"` - // Nodes is the cluster pod status, podIP and role - Nodes map[string]string `json:"nodes,omitempty"` + // Servers is the server list with state of cluster + Servers map[string][]ServerState `json:"servers,omitempty"` } //+kubebuilder:object:root=true //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="Ready",type=integer,JSONPath=`.status.readyReplicas`,description="The actual Zookeeper servers" -//+kubebuilder:printcolumn:name="Address",type=string,JSONPath=`.status.address`,description="The exposed service endpoint of the cluster" +//+kubebuilder:printcolumn:name="Endpoint",type=string,JSONPath=`.status.endpoint`,description="The exposed service endpoint of the cluster" // ZookeeperCluster is the Schema for the zookeeperclusters API type ZookeeperCluster struct { diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 76468bf..edd1485 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -25,6 +25,21 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ServerState) DeepCopyInto(out *ServerState) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ServerState. +func (in *ServerState) DeepCopy() *ServerState { + if in == nil { + return nil + } + out := new(ServerState) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ZookeeperCluster) DeepCopyInto(out *ZookeeperCluster) { *out = *in @@ -109,11 +124,19 @@ func (in *ZookeeperClusterSpec) DeepCopy() *ZookeeperClusterSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ZookeeperClusterStatus) DeepCopyInto(out *ZookeeperClusterStatus) { *out = *in - if in.Nodes != nil { - in, out := &in.Nodes, &out.Nodes - *out = make(map[string]string, len(*in)) + if in.Servers != nil { + in, out := &in.Servers, &out.Servers + *out = make(map[string][]ServerState, len(*in)) for key, val := range *in { - (*out)[key] = val + var outVal []ServerState + if val == nil { + (*out)[key] = nil + } else { + in, out := &val, &outVal + *out = make([]ServerState, len(*in)) + copy(*out, *in) + } + (*out)[key] = outVal } } } diff --git a/config/crd/bases/zookeeper.atmax.io_zookeeperclusters.yaml b/config/crd/bases/zookeeper.atmax.io_zookeeperclusters.yaml index f02ffb1..431f799 100644 --- a/config/crd/bases/zookeeper.atmax.io_zookeeperclusters.yaml +++ b/config/crd/bases/zookeeper.atmax.io_zookeeperclusters.yaml @@ -21,8 +21,8 @@ spec: name: Ready type: integer - description: The exposed service endpoint of the cluster - jsonPath: .status.address - name: Address + jsonPath: .status.endpoint + name: Endpoint type: string name: v1alpha1 schema: @@ -59,14 +59,9 @@ spec: status: description: ZookeeperClusterStatus defines the observed state of ZookeeperCluster properties: - address: - description: Address is the exposed service endpoint of the cluster + endpoint: + description: Endpoint is the exposed service endpoint of the cluster type: string - nodes: - additionalProperties: - type: string - description: Nodes is the cluster pod status, podIP and role - type: object readyReplicas: description: ReadyReplicas is the actual replicas count in the cluster format: int32 @@ -75,6 +70,28 @@ spec: description: Replicas is the desired replicas count in the cluster format: int32 type: integer + servers: + additionalProperties: + items: + description: ServerState is the server state of cluster, which + takes from Zookeeper AdminServer + properties: + address: + type: string + packets_received: + type: integer + packets_sent: + type: integer + required: + - address + - packets_received + - packets_sent + type: object + type: array + description: Servers is the server list with state of cluster + type: object + required: + - readyReplicas type: object type: object served: true diff --git a/controllers/zookeepercluster_controller.go b/controllers/zookeepercluster_controller.go index b04b324..0806922 100644 --- a/controllers/zookeepercluster_controller.go +++ b/controllers/zookeepercluster_controller.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" @@ -85,7 +86,7 @@ func (r *ZookeeperClusterReconciler) Reconcile(ctx context.Context, req ctrl.Req } { if err := fn(ctx, zk); err != nil { if err == ErrResultRequeue { - return ctrl.Result{Requeue: true}, nil + return ctrl.Result{RequeueAfter: time.Second}, nil } return ctrl.Result{}, err } @@ -206,14 +207,12 @@ func (r *ZookeeperClusterReconciler) reconcileZookeeperClusterStatus(ctx context } if len(actualPods.Items) > 0 && len(actualPods.Items[0].Status.HostIP) > 0 { - zk.Status.Address = fmt.Sprintf("%s:%d", actualPods.Items[0].Status.HostIP, actualServiceClient.Spec.Ports[0].NodePort) + zk.Status.Endpoint = fmt.Sprintf("%s:%d", actualPods.Items[0].Status.HostIP, actualServiceClient.Spec.Ports[0].NodePort) } - if zk.Status.Nodes == nil { - zk.Status.Nodes = make(map[string]string) - } + zk.Status.ReadyReplicas = 0 + zk.Status.Servers = make(map[string][]zookeeperv1alpha1.ServerState) - readyReplicas := 0 for _, pod := range actualPods.Items { podIP := pod.Status.PodIP if len(podIP) == 0 { @@ -226,17 +225,28 @@ func (r *ZookeeperClusterReconciler) reconcileZookeeperClusterStatus(ctx context continue } - zk.Status.Nodes[podIP] = zkStat.ServerStats.ServerState - readyReplicas++ - } + if zkStat.Error != nil { + r.Logger.Info(fmt.Sprintf("Get Zookeeper stat error: %v", zkStat.Error)) + continue + } - zk.Status.ReadyReplicas = int32(readyReplicas) + if _, exist := zk.Status.Servers[zkStat.ServerStats.ServerState]; !exist { + zk.Status.Servers[zkStat.ServerStats.ServerState] = make([]zookeeperv1alpha1.ServerState, 0) + } + zk.Status.Servers[zkStat.ServerStats.ServerState] = append(zk.Status.Servers[zkStat.ServerStats.ServerState], zookeeperv1alpha1.ServerState{ + Address: podIP, + PacketsSent: zkStat.ServerStats.PacketsSent, + PacketsReceived: zkStat.ServerStats.PacketsReceived, + }) + + zk.Status.ReadyReplicas++ + } if err := r.Status().Update(ctx, zk); err != nil { return err } - if zk.Spec.Replicas == zk.Status.ReadyReplicas && zk.Spec.Replicas == int32(len(zk.Status.Nodes)) { + if zk.Spec.Replicas == zk.Status.ReadyReplicas { return nil } @@ -254,6 +264,10 @@ func (r *ZookeeperClusterReconciler) createHeadlessService(zk *zookeeperv1alpha1 }, Spec: corev1.ServiceSpec{ Ports: []corev1.ServicePort{ + { + Name: "client", + Port: 2181, + }, { Name: "server", Port: 2888,