Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate cluster attributes to use v3 backend #11427

Merged
merged 5 commits into from
Dec 9, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
etcdserver: use v3 to publish member attr
  • Loading branch information
jingyih authored and YoyinZyc committed Dec 6, 2019
commit 5cd2502ab15302064fc53b71a8d8174ba4f05c6f
14 changes: 14 additions & 0 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ import (
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/auth"
"go.etcd.io/etcd/etcdserver/api"
"go.etcd.io/etcd/etcdserver/api/membership"
"go.etcd.io/etcd/etcdserver/api/membership/membershippb"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/lease"
@@ -53,6 +54,7 @@ type applyResult struct {
// applierV3Internal is the interface for processing internal V3 raft request
type applierV3Internal interface {
ClusterVersionSet(r *membershippb.ClusterVersionSetRequest)
ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest)
}

// applierV3 is the interface for processing V3 raft messages
@@ -185,6 +187,8 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList)
case r.ClusterVersionSet != nil:
a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet)
case r.ClusterMemberAttrSet != nil:
a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet)
default:
panic("not implemented")
}
@@ -852,6 +856,16 @@ func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRe
a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability)
}

func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest) {
a.s.cluster.UpdateAttributes(
types.ID(r.Member_ID),
membership.Attributes{
Name: r.MemberAttributes.Name,
ClientURLs: r.MemberAttributes.ClientUrls,
},
)
}

type quotaApplierV3 struct {
applierV3
q Quota
63 changes: 62 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
@@ -732,7 +732,7 @@ func (s *EtcdServer) adjustTicks() {
func (s *EtcdServer) Start() {
s.start()
s.goAttach(func() { s.adjustTicks() })
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.goAttach(func() { s.publishV3(s.Cfg.ReqTimeout()) })
s.goAttach(s.purgeFile)
s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) })
s.goAttach(s.monitorVersions)
@@ -1992,6 +1992,67 @@ func (s *EtcdServer) sync(timeout time.Duration) {
})
}

// publishV3 registers server information into the cluster using v3 request. The
// information is the JSON representation of this server's member struct, updated
// with the static clientURLs of the server.
// The function keeps attempting to register until it succeeds,
// or its server is stopped.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO to replace publish() in etcd 3.6?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

func (s *EtcdServer) publishV3(timeout time.Duration) {
req := &membershippb.ClusterMemberAttrSetRequest{
Member_ID: uint64(s.id),
MemberAttributes: &membershippb.Attributes{
Name: s.attributes.Name,
ClientUrls: s.attributes.ClientURLs,
},
}

for {
select {
case <-s.stopping:
if lg := s.getLogger(); lg != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update getLogger to return zap.NewNop(), so we don't need all these manual nil checks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Will fix other getLogger in a separate pr.

lg.Warn(
"stopped publish because server is stopping",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.Duration("publish-timeout", timeout),
)
}
return

default:
}

ctx, cancel := context.WithTimeout(s.ctx, timeout)
_, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterMemberAttrSet: req})
cancel()
switch err {
case nil:
close(s.readych)
if lg := s.getLogger(); lg != nil {
lg.Info(
"published local member to cluster through raft",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.String("cluster-id", s.cluster.ID().String()),
zap.Duration("publish-timeout", timeout),
)
}
return

default:
if lg := s.getLogger(); lg != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. Can we store variable once at top (before for loop), and use the same logger?

lg.Warn(
"failed to publish local member to cluster through raft",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.Duration("publish-timeout", timeout),
zap.Error(err),
)
}
}
}
}

// publish registers server information into the cluster. The information
// is the JSON representation of this server's member struct, updated with the
// static clientURLs of the server.