Skip to content

Commit

Permalink
lease: force leader to apply its pending committed index for lease op…
Browse files Browse the repository at this point in the history
…erations

suppose a lease granting request from a follower goes through and followed by a lease look up or renewal, the leader might not apply the lease grant request locally. So the leader might not find the lease from the lease look up or renewal request which will result lease not found error. To fix this issue, we force the leader to apply its pending commited index before looking up lease.

FIX etcd-io#6978
  • Loading branch information
fanminshi committed Dec 15, 2016
1 parent 86a4384 commit 850811b
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 14 deletions.
6 changes: 4 additions & 2 deletions etcdserver/api/v2http/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ const (
// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler {
var lh http.Handler
if l := s.Lessor(); l != nil {
lh = leasehttp.NewHandler(l)
l := s.Lessor()
w := s.ApplyWait()
if l != nil && w != nil {
lh = leasehttp.NewHandler(l, w, s)
}
return newPeerHandler(s.Cluster(), s.RaftHandler(), lh)
}
Expand Down
6 changes: 6 additions & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,8 @@ func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler()

func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor }

func (s *EtcdServer) ApplyWait() wait.WaitTime { return s.applyWait }

func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if s.cluster.IsIDRemoved(types.ID(m.From)) {
plog.Warningf("reject message from removed member %s", types.ID(m.From).String())
Expand Down Expand Up @@ -1613,6 +1615,10 @@ func (s *EtcdServer) getCommittedIndex() uint64 {
return atomic.LoadUint64(&s.committedIndex)
}

func (s *EtcdServer) GetCommittedIndex() uint64 {
return atomic.LoadUint64(&s.committedIndex)
}

func (s *EtcdServer) setCommittedIndex(v uint64) {
atomic.StoreUint64(&s.committedIndex, v)
}
Expand Down
3 changes: 1 addition & 2 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,8 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
if err != lease.ErrNotPrimary {
return -1, err
}

cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()

// renewals don't go through raft; forward to leader manually
for cctx.Err() == nil && err != nil {
leader, lerr := s.waitLeader(cctx)
Expand All @@ -324,6 +322,7 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
}
for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeasePrefix
// get the applied index of this local node
ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
if err == nil || err == lease.ErrLeaseNotFound {
return ttl, err
Expand Down
101 changes: 101 additions & 0 deletions integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc/mvccpb"
Expand Down Expand Up @@ -233,6 +234,106 @@ func TestV3LeaseExists(t *testing.T) {
}
}

// TestV3LeaseRenewStress keeps creating lease and renewing it immediately to ensure the renewal goes through.
// it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found.
// related issue https://github.com/coreos/etcd/issues/6978
func TestV3LeaseRenewStress(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cli1 := clus.Client(0)
cli2 := clus.Client(1)
cli3 := clus.Client(2)

errc := make(chan error)

for i := 0; i < 10; i++ {
go func() {
errc <- stressLeaseRenew(ctx, cli1)
}()
go func() {
errc <- stressLeaseRenew(ctx, cli2)
}()
go func() {
errc <- stressLeaseRenew(ctx, cli3)
}()
}

for i := 0; i < 30; i++ {
if err := <-errc; err != nil {
cancel()
t.Fatal(err)
}
}
}

func stressLeaseRenew(ctx context.Context, cli *clientv3.Client) error {
for ctx.Err() == nil {
resp, gerr := cli.Grant(context.TODO(), 60)
if gerr != nil {
return gerr
}
_, kerr := cli.KeepAliveOnce(context.TODO(), resp.ID)
if kerr != nil {
return kerr
}
}
return nil
}

// TestV3LeaseTimeToLiveStress keeps creating lease and retriving it immediately to ensure the lease can be retrived.
// it was oberserved that the immediate lease retrival after granting a lease from follower resulted lease not found.
// related issue https://github.com/coreos/etcd/issues/6978
func TestV3LeaseTimeToLiveStress(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cli1 := clus.Client(0)
cli2 := clus.Client(1)
cli3 := clus.Client(2)

errc := make(chan error)

for i := 0; i < 10; i++ {
go func() {
errc <- stressLeaseTimeToLive(ctx, cli1)
}()
go func() {
errc <- stressLeaseTimeToLive(ctx, cli2)
}()
go func() {
errc <- stressLeaseTimeToLive(ctx, cli3)
}()
}

for i := 0; i < 30; i++ {
if err := <-errc; err != nil {
cancel()
t.Fatal(err)
}
}
}

func stressLeaseTimeToLive(ctx context.Context, cli *clientv3.Client) error {
for ctx.Err() == nil {
resp, gerr := cli.Grant(context.TODO(), 60)
if gerr != nil {
return gerr
}
_, kerr := cli.TimeToLive(context.TODO(), resp.ID)
if kerr != nil {
return kerr
}
}
return nil
}

func TestV3PutOnNonExistLease(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
Expand Down
33 changes: 29 additions & 4 deletions lease/leasehttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,37 @@ import (
"fmt"
"io/ioutil"
"net/http"
"time"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/lease/leasepb"
"github.com/coreos/etcd/pkg/httputil"
"github.com/coreos/etcd/pkg/wait"
"golang.org/x/net/context"
)

var (
LeasePrefix = "/leases"
LeaseInternalPrefix = "/leases/internal"
applyTimeout = time.Second
)

// NewHandler returns an http Handler for lease renewals
func NewHandler(l lease.Lessor) http.Handler {
return &leaseHandler{l}
func NewHandler(l lease.Lessor, w wait.WaitTime, i Indexer) http.Handler {
return &leaseHandler{l, w, i}
}

type leaseHandler struct{ l lease.Lessor }
type Indexer interface {
// GetCommittedIndex returns the current CommittedIndex of this etcd node
GetCommittedIndex() uint64
}

type leaseHandler struct {
l lease.Lessor
w wait.WaitTime
i Indexer
}

func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
Expand All @@ -59,6 +71,13 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
return
}
cci := h.i.GetCommittedIndex()
select {
case <-h.w.Wait(cci):
case <-time.After(applyTimeout):
http.Error(w, "waiting for node to catch up its applied index has timed out", http.StatusInternalServerError)
return
}
ttl, err := h.l.Renew(lease.LeaseID(lreq.ID))
if err != nil {
if err == lease.ErrLeaseNotFound {
Expand All @@ -83,7 +102,13 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
return
}

cci := h.i.GetCommittedIndex()
select {
case <-h.w.Wait(cci):
case <-time.After(applyTimeout):
http.Error(w, "waiting for node to catch up its applied index has timed out", http.StatusInternalServerError)
return
}
l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
if l == nil {
http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
Expand Down
79 changes: 77 additions & 2 deletions lease/leasehttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,41 @@ import (
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
"time"

"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/pkg/wait"

"golang.org/x/net/context"
)

type indexerMock struct {
}

func (i indexerMock) GetCommittedIndex() uint64 {
// always returns 2 as current commited index. Make sure the wait.WaitTime trigger deadline appropriately for different test cases.
return 2
}

func TestRenewHTTP(t *testing.T) {
be, tmpPath := backend.NewTmpBackend(time.Hour, 10000)
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(be, int64(5))
w := wait.NewTimeList()
// trigger committed index 2 to simulate the case that node has catched up to the latest CommittedIndex 2 returned by indexerMock
w.Trigger(2)
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}

ts := httptest.NewServer(NewHandler(le))
ts := httptest.NewServer(NewHandler(le, w, &indexerMock{}))
defer ts.Close()

ttl, err := RenewHTTP(context.TODO(), l.ID, ts.URL+LeasePrefix, http.DefaultTransport)
Expand All @@ -50,6 +64,35 @@ func TestRenewHTTP(t *testing.T) {
}
}

func TestRenewHTTPTimeout(t *testing.T) {
be, tmpPath := backend.NewTmpBackend(time.Hour, 10000)
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(be, int64(5))
w := wait.NewTimeList()
// don't trigger anything, make the RenewHTTP to time out
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}

ts := httptest.NewServer(NewHandler(le, w, &indexerMock{}))
defer ts.Close()

ttl, err := RenewHTTP(context.TODO(), l.ID, ts.URL+LeasePrefix, http.DefaultTransport)
if err == nil {
t.Fatalf("expected timeout error, got nil")
}
if !strings.Contains(err.Error(), "waiting for node to catch up its applied index has timed out") {
t.Fatalf("expected timeout error, got %v", err)
}
if ttl != -1 {
t.Fatalf("ttl expected -1, got %d", ttl)
}
}

func TestTimeToLiveHTTP(t *testing.T) {
be, tmpPath := backend.NewTmpBackend(time.Hour, 10000)
defer os.Remove(tmpPath)
Expand All @@ -62,7 +105,10 @@ func TestTimeToLiveHTTP(t *testing.T) {
t.Fatalf("failed to create lease: %v", err)
}

ts := httptest.NewServer(NewHandler(le))
w := wait.NewTimeList()
// trigger committed index 2 to simulate the case that node has catched up to the latest CommittedIndex 2 returned by indexerMock
w.Trigger(2)
ts := httptest.NewServer(NewHandler(le, w, &indexerMock{}))
defer ts.Close()

resp, err := TimeToLiveHTTP(context.TODO(), l.ID, true, ts.URL+LeaseInternalPrefix, http.DefaultTransport)
Expand All @@ -76,3 +122,32 @@ func TestTimeToLiveHTTP(t *testing.T) {
t.Fatalf("granted TTL expected 5, got %d", resp.LeaseTimeToLiveResponse.GrantedTTL)
}
}

func TestTimeToLiveHTTPTimeout(t *testing.T) {
be, tmpPath := backend.NewTmpBackend(time.Hour, 10000)
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(be, int64(5))
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}

w := wait.NewTimeList()
// don't trigger anything, make the TimeToLiveHTTP to time out
ts := httptest.NewServer(NewHandler(le, w, &indexerMock{}))
defer ts.Close()

resp, err := TimeToLiveHTTP(context.TODO(), l.ID, true, ts.URL+LeaseInternalPrefix, http.DefaultTransport)
if err == nil {
t.Fatalf("expected timeout error, got nil")
}
if !strings.Contains(err.Error(), "waiting for node to catch up its applied index has timed out") {
t.Fatalf("expected timeout error, got %v", err)
}
if resp != nil {
t.Fatalf("resp expected to be nil, got %d", resp)
}
}
4 changes: 0 additions & 4 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,8 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
} else {
l.forever()
}

le.leaseMap[id] = l
l.persistTo(le.b)

return l, nil
}

Expand Down Expand Up @@ -286,12 +284,10 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {

unlock := func() { le.mu.Unlock() }
defer func() { unlock() }()

if !le.isPrimary() {
// forward renew request to primary instead of returning error.
return -1, ErrNotPrimary
}

demotec := le.demotec

l := le.leaseMap[id]
Expand Down

0 comments on commit 850811b

Please sign in to comment.