Skip to content

Commit

Permalink
Merge pull request #11753 from tangcong/automated-cherry-pick-of-#11652-
Browse files Browse the repository at this point in the history
#11670-#11710-origin-release-3.3

Automated cherry pick of #11652 #11670 #11710
  • Loading branch information
jingyih committed Apr 10, 2020
2 parents 9fd7e2b + 294e714 commit 7e20b9f
Show file tree
Hide file tree
Showing 14 changed files with 155 additions and 48 deletions.
42 changes: 42 additions & 0 deletions auth/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package auth

import (
"github.com/prometheus/client_golang/prometheus"
"sync"
)

var (
currentAuthRevision = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "etcd_debugging",
Subsystem: "auth",
Name: "revision",
Help: "The current revision of auth store.",
},
func() float64 {
reportCurrentAuthRevMu.RLock()
defer reportCurrentAuthRevMu.RUnlock()
return reportCurrentAuthRev()
},
)
// overridden by auth store initialization
reportCurrentAuthRevMu sync.RWMutex
reportCurrentAuthRev = func() float64 { return 0 }
)

func init() {
prometheus.MustRegister(currentAuthRevision)
}
50 changes: 47 additions & 3 deletions auth/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ type AuthenticateParamIndex struct{}
// AuthenticateParamSimpleTokenPrefix is used for a key of context in the parameters of Authenticate()
type AuthenticateParamSimpleTokenPrefix struct{}

// saveConsistentIndexFunc is used to sync consistentIndex to backend, now reusing store.saveIndex
type saveConsistentIndexFunc func(tx backend.BatchTx)

// AuthStore defines auth storage interface.
type AuthStore interface {
// AuthEnable turns on the authentication feature
AuthEnable() error
Expand Down Expand Up @@ -178,6 +182,9 @@ type AuthStore interface {

// HasRole checks that user has role
HasRole(user, role string) bool

// SetConsistentIndexSyncer sets consistentIndex syncer
SetConsistentIndexSyncer(syncer saveConsistentIndexFunc)
}

type TokenProvider interface {
Expand All @@ -200,9 +207,13 @@ type authStore struct {

rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions

tokenProvider TokenProvider
tokenProvider TokenProvider
syncConsistentIndex saveConsistentIndexFunc
}

func (as *authStore) SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) {
as.syncConsistentIndex = syncer
}
func (as *authStore) AuthEnable() error {
as.enabledMu.Lock()
defer as.enabledMu.Unlock()
Expand Down Expand Up @@ -252,6 +263,7 @@ func (as *authStore) AuthDisable() {
tx.Lock()
tx.UnsafePut(authBucketName, enableFlagKey, authDisabled)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
tx.Unlock()
b.ForceCommit()

Expand Down Expand Up @@ -368,6 +380,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
putUser(tx, newUser)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

plog.Noticef("added a new user: %s", r.Name)

Expand All @@ -392,6 +405,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
delUser(tx, r.Name)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.invalidateCachedPerm(r.Name)
as.tokenProvider.invalidateUser(r.Name)
Expand Down Expand Up @@ -428,6 +442,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p
putUser(tx, updatedUser)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.invalidateCachedPerm(r.Name)
as.tokenProvider.invalidateUser(r.Name)
Expand Down Expand Up @@ -468,6 +483,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
as.invalidateCachedPerm(r.User)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

plog.Noticef("granted role %s to user %s", r.Role, r.User)
return &pb.AuthUserGrantRoleResponse{}, nil
Expand Down Expand Up @@ -536,6 +552,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
as.invalidateCachedPerm(r.Name)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

plog.Noticef("revoked role %s from user %s", r.Role, r.Name)
return &pb.AuthUserRevokeRoleResponse{}, nil
Expand Down Expand Up @@ -600,6 +617,7 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest)
as.clearCachedPerm()

as.commitRevision(tx)
as.saveConsistentIndex(tx)

plog.Noticef("revoked key %s from role %s", r.Key, r.Role)
return &pb.AuthRoleRevokePermissionResponse{}, nil
Expand Down Expand Up @@ -645,6 +663,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
}

as.commitRevision(tx)
as.saveConsistentIndex(tx)

plog.Noticef("deleted role %s", r.Role)
return &pb.AuthRoleDeleteResponse{}, nil
Expand All @@ -667,6 +686,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
putRole(tx, newRole)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

plog.Noticef("Role %s is created", r.Name)

Expand Down Expand Up @@ -727,6 +747,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
as.clearCachedPerm()

as.commitRevision(tx)
as.saveConsistentIndex(tx)

plog.Noticef("role %s's permission of key %s is updated as %s", r.Name, r.Perm.Key, authpb.Permission_Type_name[int32(r.Perm.PermType)])

Expand All @@ -743,8 +764,13 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
if revision == 0 {
return ErrUserEmpty
}

if revision < as.Revision() {
rev := as.Revision()
if revision < rev {
plog.Warningf("request auth revision is less than current node auth revision,"+
"current node auth revision is %d,"+
"request auth revision is %d,"+
"request key is %s, "+
"err is %v", rev, revision, key, ErrAuthOldRevision)
return ErrAuthOldRevision
}

Expand Down Expand Up @@ -933,6 +959,8 @@ func NewAuthStore(be backend.Backend, tp TokenProvider) *authStore {
as.commitRevision(tx)
}

as.setupMetricsReporter()

tx.Unlock()
be.ForceCommit()

Expand Down Expand Up @@ -1134,3 +1162,19 @@ func (as *authStore) HasRole(user, role string) bool {

return false
}

func (as *authStore) saveConsistentIndex(tx backend.BatchTx) {
if as.syncConsistentIndex != nil {
as.syncConsistentIndex(tx)
} else {
plog.Errorf("failed to save consistentIndex,syncConsistentIndex is nil")
}
}

func (as *authStore) setupMetricsReporter() {
reportCurrentAuthRevMu.Lock()
reportCurrentAuthRev = func() float64 {
return float64(as.Revision())
}
reportCurrentAuthRevMu.Unlock()
}
3 changes: 3 additions & 0 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
ar := &applyResult{}
defer func(start time.Time) {
warnOfExpensiveRequest(start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
if ar.err != nil {
warnOfFailedRequest(start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
}
}(time.Now())

// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func openBackend(cfg ServerConfig) backend.Backend {
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
var cIndex consistentIndex
kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex)
kv := mvcc.New(oldbe, &lease.FakeLessor{}, nil, &cIndex)
defer kv.Close()
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
return oldbe, nil
Expand Down
24 changes: 13 additions & 11 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,19 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())))
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)

tp, err := auth.NewTokenProvider(cfg.AuthToken,
func(index uint64) <-chan struct{} {
return srv.applyWait.Wait(index)
},
)
if err != nil {
plog.Warningf("failed to create token provider,err is %v", err)
return nil, err
}
srv.authStore = auth.NewAuthStore(srv.be, tp)

srv.kv = mvcc.New(srv.be, srv.lessor, srv.authStore, &srv.consistIndex)
if beExist {
kvindex := srv.kv.ConsistentIndex()
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
Expand All @@ -470,16 +482,6 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
}()

srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
tp, err := auth.NewTokenProvider(cfg.AuthToken,
func(index uint64) <-chan struct{} {
return srv.applyWait.Wait(index)
},
)
if err != nil {
plog.Errorf("failed to create token provider: %s", err)
return nil, err
}
srv.authStore = auth.NewAuthStore(srv.be, tp)
if num := cfg.AutoCompactionRetention; num != 0 {
srv.compactor, err = compactor.New(cfg.AutoCompactionMode, num, srv.kv, srv)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ func TestSnapshot(t *testing.T) {
r: *r,
store: st,
}
srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex)
srv.kv = mvcc.New(be, &lease.FakeLessor{}, nil, &srv.consistIndex)
srv.be = be

ch := make(chan struct{}, 2)
Expand Down Expand Up @@ -994,7 +994,7 @@ func TestSnapshotOrdering(t *testing.T) {

be, tmpPath := backend.NewDefaultTmpBackend()
defer os.RemoveAll(tmpPath)
s.kv = mvcc.New(be, &lease.FakeLessor{}, &s.consistIndex)
s.kv = mvcc.New(be, &lease.FakeLessor{}, nil, &s.consistIndex)
s.be = be

s.start()
Expand Down Expand Up @@ -1052,7 +1052,7 @@ func TestTriggerSnap(t *testing.T) {
}
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}

srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex)
srv.kv = mvcc.New(be, &lease.FakeLessor{}, nil, &srv.consistIndex)
srv.be = be

srv.start()
Expand Down Expand Up @@ -1121,7 +1121,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
defer func() {
os.RemoveAll(tmpPath)
}()
s.kv = mvcc.New(be, &lease.FakeLessor{}, &s.consistIndex)
s.kv = mvcc.New(be, &lease.FakeLessor{}, nil, &s.consistIndex)
s.be = be

s.start()
Expand Down
9 changes: 9 additions & 0 deletions etcdserver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ func warnOfExpensiveRequest(now time.Time, reqStringer fmt.Stringer, respMsg pro
warnOfExpensiveGenericRequest(now, reqStringer, "", resp, err)
}

func warnOfFailedRequest(now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
var resp string
if !isNil(respMsg) {
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
}
d := time.Since(now)
plog.Warningf("failed to apply request,took %v,request %s,resp %s,err is %v", d, reqStringer.String(), resp, err)
}

func warnOfExpensiveReadOnlyTxnRequest(now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
reqStringer := pb.NewLoggableTxnRequest(r)
var resp string
Expand Down
8 changes: 5 additions & 3 deletions integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,9 +781,11 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {

type eventsSortByKey []*mvccpb.Event

func (evs eventsSortByKey) Len() int { return len(evs) }
func (evs eventsSortByKey) Swap(i, j int) { evs[i], evs[j] = evs[j], evs[i] }
func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 }
func (evs eventsSortByKey) Len() int { return len(evs) }
func (evs eventsSortByKey) Swap(i, j int) { evs[i], evs[j] = evs[j], evs[i] }
func (evs eventsSortByKey) Less(i, j int) bool {
return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0
}

func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
defer testutil.AfterTest(t)
Expand Down
2 changes: 1 addition & 1 deletion mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ func TestKVSnapshot(t *testing.T) {

func TestWatchableKVWatch(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil, nil))
defer cleanup(s, b, tmpPath)

w := s.NewWatchStream()
Expand Down
11 changes: 8 additions & 3 deletions mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync"
"time"

"github.com/coreos/etcd/auth"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
Expand Down Expand Up @@ -67,11 +68,11 @@ type watchableStore struct {
// cancel operations.
type cancelFunc func()

func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV {
return newWatchableStore(b, le, ig)
func New(b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter) ConsistentWatchableKV {
return newWatchableStore(b, le, as, ig)
}

func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore {
func newWatchableStore(b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter) *watchableStore {
s := &watchableStore{
store: NewStore(b, le, ig),
victimc: make(chan struct{}, 1),
Expand All @@ -85,6 +86,10 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet
// use this store as the deleter so revokes trigger watch events
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
}
if as != nil {
// TODO: encapsulating consistentindex into a separate package
as.SetConsistentIndexSyncer(s.store.saveIndex)
}
s.wg.Add(2)
go s.syncWatchersLoop()
go s.syncVictimsLoop()
Expand Down
Loading

0 comments on commit 7e20b9f

Please sign in to comment.