Skip to content

Commit

Permalink
Merge pull request #7015 from fanminshi/fix_lease_expired_too_soon
Browse files Browse the repository at this point in the history
lease: force leader to apply its pending committed index for lease op…
  • Loading branch information
fanminshi committed Dec 27, 2016
2 parents 17873f7 + 2faf72f commit 89b18ff
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 20 deletions.
5 changes: 3 additions & 2 deletions etcdserver/api/v2http/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ const (
// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler {
var lh http.Handler
if l := s.Lessor(); l != nil {
lh = leasehttp.NewHandler(l)
l := s.Lessor()
if l != nil {
lh = leasehttp.NewHandler(l, func() <-chan struct{} { return s.ApplyWait() })
}
return newPeerHandler(s.Cluster(), s.RaftHandler(), lh)
}
Expand Down
15 changes: 15 additions & 0 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ func (r *raftNode) start(rh *raftReadyHandler) {
raftDone: raftDone,
}

updateCommittedIndex(&ap, rh)

select {
case r.applyc <- ap:
case <-r.stopped:
Expand Down Expand Up @@ -231,6 +233,19 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}()
}

func updateCommittedIndex(ap *apply, rh *raftReadyHandler) {
var ci uint64
if len(ap.entries) != 0 {
ci = ap.entries[len(ap.entries)-1].Index
}
if ap.snapshot.Metadata.Index > ci {
ci = ap.snapshot.Metadata.Index
}
if ci != 0 {
rh.updateCommittedIndex(ci)
}
}

func (r *raftNode) sendMessages(ms []raftpb.Message) {
sentAppResp := false
for i := len(ms) - 1; i >= 0; i-- {
Expand Down
21 changes: 10 additions & 11 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,8 @@ func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler()

func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor }

func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) }

func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if s.cluster.IsIDRemoved(types.ID(m.From)) {
plog.Warningf("reject message from removed member %s", types.ID(m.From).String())
Expand Down Expand Up @@ -596,7 +598,8 @@ type etcdProgress struct {
// and helps decouple state machine logic from Raft algorithms.
// TODO: add a state machine interface to apply the commit entries and do snapshot/recover
type raftReadyHandler struct {
leadershipUpdate func()
leadershipUpdate func()
updateCommittedIndex func(uint64)
}

func (s *EtcdServer) run() {
Expand Down Expand Up @@ -646,6 +649,12 @@ func (s *EtcdServer) run() {
s.r.td.Reset()
}
},
updateCommittedIndex: func(ci uint64) {
cci := s.getCommittedIndex()
if ci > cci {
s.setCommittedIndex(ci)
}
},
}
s.r.start(rh)

Expand Down Expand Up @@ -699,16 +708,6 @@ func (s *EtcdServer) run() {
for {
select {
case ap := <-s.r.apply():
var ci uint64
if len(ap.entries) != 0 {
ci = ap.entries[len(ap.entries)-1].Index
}
if ap.snapshot.Metadata.Index > ci {
ci = ap.snapshot.Metadata.Index
}
if ci != 0 {
s.setCommittedIndex(ci)
}
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
case leases := <-expiredLeaseC:
Expand Down
85 changes: 85 additions & 0 deletions integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,91 @@ func TestV3LeaseExists(t *testing.T) {
}
}

// TestV3LeaseRenewStress keeps creating lease and renewing it immediately to ensure the renewal goes through.
// it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found.
// related issue https://github.com/coreos/etcd/issues/6978
func TestV3LeaseRenewStress(t *testing.T) {
testLeaseStress(t, stressLeaseRenew)
}

// TestV3LeaseTimeToLiveStress keeps creating lease and retriving it immediately to ensure the lease can be retrived.
// it was oberserved that the immediate lease retrival after granting a lease from follower resulted lease not found.
// related issue https://github.com/coreos/etcd/issues/6978
func TestV3LeaseTimeToLiveStress(t *testing.T) {
testLeaseStress(t, stressLeaseTimeToLive)
}

func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
errc := make(chan error)

for i := 0; i < 30; i++ {
for j := 0; j < 3; j++ {
go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j)
}
}

for i := 0; i < 90; i++ {
if err := <-errc; err != nil {
t.Fatal(err)
}
}
}

func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) {
defer func() {
if tctx.Err() != nil {
reterr = nil
}
}()
lac, err := lc.LeaseKeepAlive(tctx)
if err != nil {
return err
}
for tctx.Err() == nil {
resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60})
if gerr != nil {
continue
}
err = lac.Send(&pb.LeaseKeepAliveRequest{ID: resp.ID})
if err != nil {
continue
}
rresp, rxerr := lac.Recv()
if rxerr != nil {
continue
}
if rresp.TTL == 0 {
return fmt.Errorf("TTL shouldn't be 0 so soon")
}
}
return nil
}

func stressLeaseTimeToLive(tctx context.Context, lc pb.LeaseClient) (reterr error) {
defer func() {
if tctx.Err() != nil {
reterr = nil
}
}()
for tctx.Err() == nil {
resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60})
if gerr != nil {
continue
}
_, kerr := lc.LeaseTimeToLive(tctx, &pb.LeaseTimeToLiveRequest{ID: resp.ID})
if rpctypes.Error(kerr) == rpctypes.ErrLeaseNotFound {
return kerr
}
}
return nil
}

func TestV3PutOnNonExistLease(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
Expand Down
37 changes: 32 additions & 5 deletions lease/leasehttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package leasehttp

import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"net/http"
"time"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease"
Expand All @@ -30,14 +32,19 @@ import (
var (
LeasePrefix = "/leases"
LeaseInternalPrefix = "/leases/internal"
applyTimeout = time.Second
ErrLeaseHTTPTimeout = errors.New("waiting for node to catch up its applied index has timed out")
)

// NewHandler returns an http Handler for lease renewals
func NewHandler(l lease.Lessor) http.Handler {
return &leaseHandler{l}
func NewHandler(l lease.Lessor, waitch func() <-chan struct{}) http.Handler {
return &leaseHandler{l, waitch}
}

type leaseHandler struct{ l lease.Lessor }
type leaseHandler struct {
l lease.Lessor
waitch func() <-chan struct{}
}

func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
Expand All @@ -59,6 +66,12 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
return
}
select {
case <-h.waitch():
case <-time.After(applyTimeout):
http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
return
}
ttl, err := h.l.Renew(lease.LeaseID(lreq.ID))
if err != nil {
if err == lease.ErrLeaseNotFound {
Expand All @@ -83,7 +96,12 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
return
}

select {
case <-h.waitch():
case <-time.After(applyTimeout):
http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
return
}
l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
if l == nil {
http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
Expand Down Expand Up @@ -148,6 +166,10 @@ func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundT
return -1, err
}

if resp.StatusCode == http.StatusRequestTimeout {
return -1, ErrLeaseHTTPTimeout
}

if resp.StatusCode == http.StatusNotFound {
return -1, lease.ErrLeaseNotFound
}
Expand Down Expand Up @@ -184,7 +206,8 @@ func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string

cc := &http.Client{Transport: rt}
var b []byte
errc := make(chan error)
// buffer errc channel so that errc don't block inside the go routinue
errc := make(chan error, 2)
go func() {
resp, err := cc.Do(req)
if err != nil {
Expand All @@ -196,6 +219,10 @@ func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string
errc <- err
return
}
if resp.StatusCode == http.StatusRequestTimeout {
errc <- ErrLeaseHTTPTimeout
return
}
if resp.StatusCode == http.StatusNotFound {
errc <- lease.ErrLeaseNotFound
return
Expand Down
53 changes: 51 additions & 2 deletions lease/leasehttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import (
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
"time"

"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"

"golang.org/x/net/context"
)

Expand All @@ -38,7 +40,7 @@ func TestRenewHTTP(t *testing.T) {
t.Fatalf("failed to create lease: %v", err)
}

ts := httptest.NewServer(NewHandler(le))
ts := httptest.NewServer(NewHandler(le, waitReady))
defer ts.Close()

ttl, err := RenewHTTP(context.TODO(), l.ID, ts.URL+LeasePrefix, http.DefaultTransport)
Expand All @@ -62,7 +64,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
t.Fatalf("failed to create lease: %v", err)
}

ts := httptest.NewServer(NewHandler(le))
ts := httptest.NewServer(NewHandler(le, waitReady))
defer ts.Close()

resp, err := TimeToLiveHTTP(context.TODO(), l.ID, true, ts.URL+LeaseInternalPrefix, http.DefaultTransport)
Expand All @@ -76,3 +78,50 @@ func TestTimeToLiveHTTP(t *testing.T) {
t.Fatalf("granted TTL expected 5, got %d", resp.LeaseTimeToLiveResponse.GrantedTTL)
}
}

func TestRenewHTTPTimeout(t *testing.T) {
testApplyTimeout(t, func(l *lease.Lease, serverURL string) error {
_, err := RenewHTTP(context.TODO(), l.ID, serverURL+LeasePrefix, http.DefaultTransport)
return err
})
}

func TestTimeToLiveHTTPTimeout(t *testing.T) {
testApplyTimeout(t, func(l *lease.Lease, serverURL string) error {
_, err := TimeToLiveHTTP(context.TODO(), l.ID, true, serverURL+LeaseInternalPrefix, http.DefaultTransport)
return err
})
}

func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
be, tmpPath := backend.NewTmpBackend(time.Hour, 10000)
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(be, int64(5))
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}

ts := httptest.NewServer(NewHandler(le, waitNotReady))
defer ts.Close()
err = f(l, ts.URL)
if err == nil {
t.Fatalf("expected timeout error, got nil")
}
if strings.Compare(err.Error(), ErrLeaseHTTPTimeout.Error()) != 0 {
t.Fatalf("expected (%v), got (%v)", ErrLeaseHTTPTimeout.Error(), err.Error())
}
}

func waitReady() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}

func waitNotReady() <-chan struct{} {
return nil
}

0 comments on commit 89b18ff

Please sign in to comment.