Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: refactor consistentindex #11699

Merged
merged 3 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions auth/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"go.etcd.io/etcd/auth/authpb"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/etcdserver/cindex"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/mvcc/backend"

Expand Down Expand Up @@ -91,9 +92,6 @@ type AuthenticateParamIndex struct{}
// AuthenticateParamSimpleTokenPrefix is used for a key of context in the parameters of Authenticate()
type AuthenticateParamSimpleTokenPrefix struct{}

// saveConsistentIndexFunc is used to sync consistentIndex to backend, now reusing store.saveIndex
type saveConsistentIndexFunc func(tx backend.BatchTx)

// AuthStore defines auth storage interface.
type AuthStore interface {
// AuthEnable turns on the authentication feature
Expand Down Expand Up @@ -186,9 +184,6 @@ type AuthStore interface {

// HasRole checks that user has role
HasRole(user, role string) bool

// SetConsistentIndexSyncer sets consistentIndex syncer
SetConsistentIndexSyncer(syncer saveConsistentIndexFunc)
}

type TokenProvider interface {
Expand All @@ -212,14 +207,11 @@ type authStore struct {

rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions

tokenProvider TokenProvider
syncConsistentIndex saveConsistentIndexFunc
bcryptCost int // the algorithm cost / strength for hashing auth passwords
tokenProvider TokenProvider
bcryptCost int // the algorithm cost / strength for hashing auth passwords
ci cindex.ConsistentIndexer
}

func (as *authStore) SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) {
as.syncConsistentIndex = syncer
}
func (as *authStore) AuthEnable() error {
as.enabledMu.Lock()
defer as.enabledMu.Unlock()
Expand Down Expand Up @@ -1018,7 +1010,7 @@ func (as *authStore) IsAuthEnabled() bool {
}

// NewAuthStore creates a new AuthStore.
func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCost int) *authStore {
func NewAuthStore(lg *zap.Logger, be backend.Backend, ci cindex.ConsistentIndexer, tp TokenProvider, bcryptCost int) *authStore {
if lg == nil {
lg = zap.NewNop()
}
Expand Down Expand Up @@ -1053,6 +1045,7 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo
revision: getRevision(tx),
lg: lg,
be: be,
ci: ci,
enabled: enabled,
rangePermCache: make(map[string]*unifiedRangePermissions),
tokenProvider: tp,
Expand Down Expand Up @@ -1314,10 +1307,10 @@ func (as *authStore) BcryptCost() int {
}

func (as *authStore) saveConsistentIndex(tx backend.BatchTx) {
if as.syncConsistentIndex != nil {
as.syncConsistentIndex(tx)
if as.ci != nil {
as.ci.UnsafeSave(tx)
} else {
as.lg.Error("failed to save consistentIndex,syncConsistentIndex is nil")
as.lg.Error("failed to save consistentIndex,consistentIndexer is nil")
}
}

Expand Down
16 changes: 8 additions & 8 deletions auth/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestNewAuthStoreRevision(t *testing.T) {
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
err = enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
Expand All @@ -63,7 +63,7 @@ func TestNewAuthStoreRevision(t *testing.T) {

// no changes to commit
b2 := backend.NewDefaultBackend(tPath)
as = NewAuthStore(zap.NewExample(), b2, tp, bcrypt.MinCost)
as = NewAuthStore(zap.NewExample(), b2, nil, tp, bcrypt.MinCost)
new := as.Revision()
as.Close()
b2.Close()
Expand All @@ -85,7 +85,7 @@ func TestNewAuthStoreBcryptCost(t *testing.T) {

invalidCosts := [2]int{bcrypt.MinCost - 1, bcrypt.MaxCost + 1}
for _, invalidCost := range invalidCosts {
as := NewAuthStore(zap.NewExample(), b, tp, invalidCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, invalidCost)
if as.BcryptCost() != bcrypt.DefaultCost {
t.Fatalf("expected DefaultCost when bcryptcost is invalid")
}
Expand All @@ -102,7 +102,7 @@ func setupAuthStore(t *testing.T) (store *authStore, teardownfunc func(t *testin
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
err = enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -703,7 +703,7 @@ func TestAuthInfoFromCtxRace(t *testing.T) {
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
defer as.Close()

donec := make(chan struct{})
Expand Down Expand Up @@ -769,7 +769,7 @@ func TestRecoverFromSnapshot(t *testing.T) {
if err != nil {
t.Fatal(err)
}
as2 := NewAuthStore(zap.NewExample(), as.be, tp, bcrypt.MinCost)
as2 := NewAuthStore(zap.NewExample(), as.be, nil, tp, bcrypt.MinCost)
defer func(a *authStore) {
a.Close()
}(as2)
Expand Down Expand Up @@ -851,7 +851,7 @@ func TestRolesOrder(t *testing.T) {
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
err = enableAuthAndCreateRoot(as)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -906,7 +906,7 @@ func testAuthInfoFromCtxWithRoot(t *testing.T, opts string) {
if err != nil {
t.Fatal(err)
}
as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost)
as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost)
defer as.Close()

if err = enableAuthAndCreateRoot(as); err != nil {
Expand Down
10 changes: 3 additions & 7 deletions clientv3/snapshot/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package snapshot

import "encoding/binary"
import (
"encoding/binary"
)

type revision struct {
main int64
Expand All @@ -27,9 +29,3 @@ func bytesToRev(bytes []byte) revision {
sub: int64(binary.BigEndian.Uint64(bytes[9:])),
}
}

// initIndex implements ConsistentIndexGetter so the snapshot won't block
// the new raft instance by waiting for a future raft index.
type initIndex int

func (i *initIndex) ConsistentIndex() uint64 { return uint64(*i) }
5 changes: 4 additions & 1 deletion clientv3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.etcd.io/etcd/etcdserver/api/membership"
"go.etcd.io/etcd/etcdserver/api/snap"
"go.etcd.io/etcd/etcdserver/api/v2store"
"go.etcd.io/etcd/etcdserver/cindex"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc"
Expand Down Expand Up @@ -384,7 +385,9 @@ func (s *v3Manager) saveDB() error {
// a lessor never timeouts leases
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})

mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
ci := cindex.NewConsistentIndex(be.BatchTx())
ci.SetConsistentIndex(uint64(commit))
tangcong marked this conversation as resolved.
Show resolved Hide resolved
mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
txn := mvs.Write(traceutil.TODO())
btx := be.BatchTx()
del := func(k, v []byte) error {
Expand Down
5 changes: 3 additions & 2 deletions etcdserver/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"go.etcd.io/etcd/etcdserver/api/snap"
"go.etcd.io/etcd/etcdserver/cindex"
"go.etcd.io/etcd/lease"
"go.etcd.io/etcd/mvcc"
"go.etcd.io/etcd/mvcc/backend"
Expand Down Expand Up @@ -94,8 +95,8 @@ func openBackend(cfg ServerConfig) backend.Backend {
// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
var cIndex consistentIndex
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
ci := cindex.NewConsistentIndex(oldbe.BatchTx())
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, ci, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
defer kv.Close()
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
return oldbe, nil
Expand Down
114 changes: 114 additions & 0 deletions etcdserver/cindex/cindex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cindex

import (
"encoding/binary"
"sync"
"sync/atomic"

"go.etcd.io/etcd/mvcc/backend"
)

var (
metaBucketName = []byte("meta")

consistentIndexKeyName = []byte("consistent_index")
)

// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
type ConsistentIndexer interface {

tangcong marked this conversation as resolved.
Show resolved Hide resolved
// ConsistentIndex returns the consistent index of current executing entry.
ConsistentIndex() uint64

// SetConsistentIndex set the consistent index of current executing entry.
SetConsistentIndex(v uint64)

// UnsafeSave must be called holding the lock on the tx.
// It saves consistentIndex to the underlying stable storage.
UnsafeSave(tx backend.BatchTx)

// SetBatchTx set the available backend.BatchTx for ConsistentIndexer.
SetBatchTx(tx backend.BatchTx)
}

// consistentIndex implements the ConsistentIndexer interface.
type consistentIndex struct {
tx backend.BatchTx
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more question. Do we need to move tx below consistentIndex to keep consistentIndex 64-bit aligned?

Copy link
Contributor Author

@tangcong tangcong Mar 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sizeof(interface) is 8 bytes in 32 bit,16 bytes in 64 bit. so we do not need to move tx below consistentIndex.
https://play.golang.org/p/9Bj6zdtyehz

// consistentIndex represents the offset of an entry in a consistent replica log.
// it caches the "consistent_index" key's value. Accessed
// through atomics so must be 64-bit aligned.
consistentIndex uint64
// bytesBuf8 is a byte slice of length 8
// to avoid a repetitive allocation in saveIndex.
bytesBuf8 []byte
mutex sync.Mutex
}

func NewConsistentIndex(tx backend.BatchTx) ConsistentIndexer {
return &consistentIndex{tx: tx, bytesBuf8: make([]byte, 8)}
}

func (ci *consistentIndex) ConsistentIndex() uint64 {

if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
return index
}
ci.mutex.Lock()
defer ci.mutex.Unlock()
ci.tx.Lock()
defer ci.tx.Unlock()
_, vs := ci.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0
}
v := binary.BigEndian.Uint64(vs[0])
atomic.StoreUint64(&ci.consistentIndex, v)
return v
}

func (ci *consistentIndex) SetConsistentIndex(v uint64) {
atomic.StoreUint64(&ci.consistentIndex, v)
}

func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
bs := ci.bytesBuf8
binary.BigEndian.PutUint64(bs, ci.consistentIndex)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
}

func (ci *consistentIndex) SetBatchTx(tx backend.BatchTx) {
ci.mutex.Lock()
defer ci.mutex.Unlock()
ci.tx = tx
}

func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
return &fakeConsistentIndex{index: index}
}

type fakeConsistentIndex struct{ index uint64 }

func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }

func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) {
atomic.StoreUint64(&f.index, index)
}

func (f *fakeConsistentIndex) UnsafeSave(tx backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBatchTx(tx backend.BatchTx) {}
Loading