forked from aenix-io/etcd-operator
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Extended status check for reconciliation (aenix-io#207)
- Loading branch information
Showing
6 changed files
with
286 additions
and
32 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
package factory | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/aenix-io/etcd-operator/api/v1alpha1" | ||
clientv3 "go.etcd.io/etcd/client/v3" | ||
v1 "k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/types" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
) | ||
|
||
func NewEtcdClientSet(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli client.Client) (*clientv3.Client, []*clientv3.Client, error) { | ||
cfg, err := configFromCluster(ctx, cluster, cli) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
if len(cfg.Endpoints) == 0 { | ||
return nil, nil, nil | ||
} | ||
eps := cfg.Endpoints | ||
clusterClient, err := clientv3.New(cfg) | ||
if err != nil { | ||
return nil, nil, fmt.Errorf("error building etcd cluster client: %w", err) | ||
} | ||
singleClients := make([]*clientv3.Client, len(eps)) | ||
for i, ep := range eps { | ||
cfg.Endpoints = []string{ep} | ||
singleClients[i], err = clientv3.New(cfg) | ||
if err != nil { | ||
return nil, nil, fmt.Errorf("error building etcd single-endpoint client for endpoint %s: %w", ep, err) | ||
} | ||
} | ||
return clusterClient, singleClients, nil | ||
} | ||
|
||
func configFromCluster(ctx context.Context, cluster *v1alpha1.EtcdCluster, cli client.Client) (clientv3.Config, error) { | ||
ep := v1.Endpoints{} | ||
err := cli.Get(ctx, types.NamespacedName{Name: GetHeadlessServiceName(cluster), Namespace: cluster.Namespace}, &ep) | ||
if client.IgnoreNotFound(err) != nil { | ||
return clientv3.Config{}, err | ||
} | ||
if err != nil { | ||
return clientv3.Config{Endpoints: []string{}}, nil | ||
} | ||
|
||
names := map[string]struct{}{} | ||
urls := make([]string, 0, 8) | ||
for _, v := range ep.Subsets { | ||
for _, addr := range v.Addresses { | ||
names[addr.Hostname] = struct{}{} | ||
} | ||
for _, addr := range v.NotReadyAddresses { | ||
names[addr.Hostname] = struct{}{} | ||
} | ||
} | ||
for name := range names { | ||
urls = append(urls, fmt.Sprintf("%s:%s", name, "2379")) | ||
} | ||
|
||
return clientv3.Config{Endpoints: urls}, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package controller | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
|
||
clientv3 "go.etcd.io/etcd/client/v3" | ||
appsv1 "k8s.io/api/apps/v1" | ||
corev1 "k8s.io/api/core/v1" | ||
) | ||
|
||
// etcdStatus holds the details of the status that an etcd endpoint | ||
// can return about itself, i.e. its own status and its perceived | ||
// member list | ||
type etcdStatus struct { | ||
endpointStatus *clientv3.StatusResponse | ||
endpointStatusError error | ||
memberList *clientv3.MemberListResponse | ||
memberListError error | ||
} | ||
|
||
// observables stores observations that the operator can make about | ||
// states of objects in kubernetes | ||
type observables struct { | ||
statefulSet appsv1.StatefulSet | ||
stsExists bool | ||
endpointsFound bool | ||
etcdStatuses []etcdStatus | ||
clusterID uint64 | ||
_ int | ||
_ []corev1.PersistentVolumeClaim | ||
} | ||
|
||
// setClusterID populates the clusterID field based on etcdStatuses | ||
func (o *observables) setClusterID() { | ||
for i := range o.etcdStatuses { | ||
if o.etcdStatuses[i].endpointStatus != nil { | ||
o.clusterID = o.etcdStatuses[i].endpointStatus.Header.ClusterId | ||
return | ||
} | ||
} | ||
} | ||
|
||
// inSplitbrain compares clusterID field with clusterIDs in etcdStatuses. | ||
// If more than one unique ID is reported, cluster is in splitbrain. | ||
func (o *observables) inSplitbrain() bool { | ||
for i := range o.etcdStatuses { | ||
if o.etcdStatuses[i].endpointStatus != nil { | ||
if o.clusterID != o.etcdStatuses[i].endpointStatus.Header.ClusterId { | ||
return true | ||
} | ||
} | ||
} | ||
return false | ||
} | ||
|
||
// fill takes a single-endpoint client and populates the fields of etcdStatus | ||
// with the endpoint's status and its perceived member list. | ||
func (s *etcdStatus) fill(ctx context.Context, c *clientv3.Client) { | ||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
s.endpointStatus, s.endpointStatusError = c.Status(ctx, c.Endpoints()[0]) | ||
}() | ||
s.memberList, s.memberListError = c.MemberList(ctx) | ||
wg.Wait() | ||
} | ||
|
||
// TODO: make a real function | ||
func (o *observables) _() int { | ||
if o.etcdStatuses != nil { | ||
for i := range o.etcdStatuses { | ||
if o.etcdStatuses[i].memberList != nil { | ||
return len(o.etcdStatuses[i].memberList.Members) | ||
} | ||
} | ||
} | ||
return 0 | ||
} |
Oops, something went wrong.