Skip to content

Commit

Permalink
lease: force leader to apply its pending committed index for lease op…
Browse files Browse the repository at this point in the history
…erations

suppose a lease granting request from a follower goes through and followed by a lease look up or renewal, the leader might not apply the lease grant request locally. So the leader might not find the lease from the lease look up or renewal request which will result lease not found error. To fix this issue, we force the leader to apply its pending commited index before looking up lease.

FIX #6978
  • Loading branch information
fanminshi committed Dec 16, 2016
1 parent 86a4384 commit d8849cb
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 8 deletions.
5 changes: 3 additions & 2 deletions etcdserver/api/v2http/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ const (
// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler {
var lh http.Handler
if l := s.Lessor(); l != nil {
lh = leasehttp.NewHandler(l)
l := s.Lessor()
if l != nil {
lh = leasehttp.NewHandler(l, func() <-chan struct{} { return s.ApplyWait() })
}
return newPeerHandler(s.Cluster(), s.RaftHandler(), lh)
}
Expand Down
6 changes: 6 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var (
ErrGRPCLeaseNotFound = grpc.Errorf(codes.NotFound, "etcdserver: requested lease not found")
ErrGRPCLeaseExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: lease already exists")

ErrGRPCLeaseHTTPTimeout = grpc.Errorf(codes.Unavailable, "etcdserver: lease http request timed out")

ErrGRPCMemberExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: member ID already exist")
ErrGRPCPeerURLExist = grpc.Errorf(codes.FailedPrecondition, "etcdserver: Peer URLs already exists")
ErrGRPCMemberBadURLs = grpc.Errorf(codes.InvalidArgument, "etcdserver: given member URLs are invalid")
Expand Down Expand Up @@ -70,6 +72,8 @@ var (
grpc.ErrorDesc(ErrGRPCLeaseNotFound): ErrGRPCLeaseNotFound,
grpc.ErrorDesc(ErrGRPCLeaseExist): ErrGRPCLeaseExist,

grpc.ErrorDesc(ErrGRPCLeaseHTTPTimeout): ErrGRPCLeaseHTTPTimeout,

grpc.ErrorDesc(ErrGRPCMemberExist): ErrGRPCMemberExist,
grpc.ErrorDesc(ErrGRPCPeerURLExist): ErrGRPCPeerURLExist,
grpc.ErrorDesc(ErrGRPCMemberBadURLs): ErrGRPCMemberBadURLs,
Expand Down Expand Up @@ -110,6 +114,8 @@ var (
ErrLeaseNotFound = Error(ErrGRPCLeaseNotFound)
ErrLeaseExist = Error(ErrGRPCLeaseExist)

ErrLeaseHTTPTimeout = Error(ErrGRPCLeaseHTTPTimeout)

ErrMemberExist = Error(ErrGRPCMemberExist)
ErrPeerURLExist = Error(ErrGRPCPeerURLExist)
ErrMemberBadURLs = Error(ErrGRPCMemberBadURLs)
Expand Down
4 changes: 4 additions & 0 deletions etcdserver/api/v3rpc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/lease/leasehttp"
"github.com/coreos/etcd/mvcc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -65,6 +66,9 @@ func togRPCError(err error) error {
case lease.ErrLeaseExists:
return rpctypes.ErrGRPCLeaseExist

case leasehttp.ErrLeaseHTTPTimeout:
return rpctypes.ErrGRPCLeaseHTTPTimeout

case auth.ErrRootUserNotExist:
return rpctypes.ErrGRPCRootUserNotExist
case auth.ErrRootRoleNotExist:
Expand Down
2 changes: 2 additions & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,8 @@ func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler()

func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor }

func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) }

func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if s.cluster.IsIDRemoved(types.ID(m.From)) {
plog.Warningf("reject message from removed member %s", types.ID(m.From).String())
Expand Down
77 changes: 77 additions & 0 deletions integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,83 @@ func TestV3LeaseExists(t *testing.T) {
}
}

// TestV3LeaseRenewStress keeps creating lease and renewing it immediately to ensure the renewal goes through.
// it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found.
// related issue https://github.com/coreos/etcd/issues/6978
func TestV3LeaseRenewStress(t *testing.T) {
testLeaseStress(t, stressLeaseRenew)
}

// TestV3LeaseTimeToLiveStress keeps creating lease and retriving it immediately to ensure the lease can be retrived.
// it was oberserved that the immediate lease retrival after granting a lease from follower resulted lease not found.
// related issue https://github.com/coreos/etcd/issues/6978
func TestV3LeaseTimeToLiveStress(t *testing.T) {
testLeaseStress(t, stressLeaseTimeToLive)
}

func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
errc := make(chan error)

for i := 0; i < 30; i++ {
for j := 0; j < 3; j++ {
go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j)
}
}

for i := 0; i < 90; i++ {
if err := <-errc; err != nil {
t.Fatal(err)
}
}
}

func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) error {
cctx, cancel := context.WithCancel(context.Background())
defer cancel()
lac, err := lc.LeaseKeepAlive(cctx)
if err != nil {
return err
}
for tctx.Err() == nil {
resp, gerr := lc.LeaseGrant(cctx, &pb.LeaseGrantRequest{TTL: 60})
if gerr != nil {
return gerr
}
err = lac.Send(&pb.LeaseKeepAliveRequest{ID: resp.ID})
if err != nil {
return err
}
rresp, rxerr := lac.Recv()
if rxerr != nil {
return rxerr
}
if rresp.TTL == 0 {
return fmt.Errorf("TTL shouldn't be 0 so soon")
}
}
return nil
}

func stressLeaseTimeToLive(tctx context.Context, lc pb.LeaseClient) error {
for tctx.Err() == nil {
resp, gerr := lc.LeaseGrant(context.Background(), &pb.LeaseGrantRequest{TTL: 60})
if gerr != nil {
return gerr
}
_, kerr := lc.LeaseTimeToLive(context.Background(), &pb.LeaseTimeToLiveRequest{ID: resp.ID})
if kerr != nil {
return kerr
}
}
return nil
}

func TestV3PutOnNonExistLease(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
Expand Down
34 changes: 30 additions & 4 deletions lease/leasehttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package leasehttp

import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"net/http"
"time"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease"
Expand All @@ -30,14 +32,19 @@ import (
var (
LeasePrefix = "/leases"
LeaseInternalPrefix = "/leases/internal"
applyTimeout = time.Second
ErrLeaseHTTPTimeout = errors.New("waiting for node to catch up its applied index has timed out")
)

// NewHandler returns an http Handler for lease renewals
func NewHandler(l lease.Lessor) http.Handler {
return &leaseHandler{l}
func NewHandler(l lease.Lessor, waitch func() <-chan struct{}) http.Handler {
return &leaseHandler{l, waitch}
}

type leaseHandler struct{ l lease.Lessor }
type leaseHandler struct {
l lease.Lessor
waitch func() <-chan struct{}
}

func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
Expand All @@ -59,6 +66,12 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
return
}
select {
case <-h.waitch():
case <-time.After(applyTimeout):
http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
return
}
ttl, err := h.l.Renew(lease.LeaseID(lreq.ID))
if err != nil {
if err == lease.ErrLeaseNotFound {
Expand All @@ -83,7 +96,12 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
return
}

select {
case <-h.waitch():
case <-time.After(applyTimeout):
http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
return
}
l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
if l == nil {
http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
Expand Down Expand Up @@ -148,6 +166,10 @@ func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundT
return -1, err
}

if resp.StatusCode == http.StatusRequestTimeout {
return -1, ErrLeaseHTTPTimeout
}

if resp.StatusCode == http.StatusNotFound {
return -1, lease.ErrLeaseNotFound
}
Expand Down Expand Up @@ -196,6 +218,10 @@ func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string
errc <- err
return
}
if resp.StatusCode == http.StatusRequestTimeout {
errc <- ErrLeaseHTTPTimeout
return
}
if resp.StatusCode == http.StatusNotFound {
errc <- lease.ErrLeaseNotFound
return
Expand Down
52 changes: 50 additions & 2 deletions lease/leasehttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (

"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/pkg/wait"

"strings"

"golang.org/x/net/context"
)

Expand All @@ -32,13 +36,15 @@ func TestRenewHTTP(t *testing.T) {
defer be.Close()

le := lease.NewLessor(be, int64(5))
w := wait.NewTimeList()
w.Trigger(2)
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}

ts := httptest.NewServer(NewHandler(le))
ts := httptest.NewServer(NewHandler(le, func() <-chan struct{} { return w.Wait(2) }))
defer ts.Close()

ttl, err := RenewHTTP(context.TODO(), l.ID, ts.URL+LeasePrefix, http.DefaultTransport)
Expand All @@ -62,7 +68,10 @@ func TestTimeToLiveHTTP(t *testing.T) {
t.Fatalf("failed to create lease: %v", err)
}

ts := httptest.NewServer(NewHandler(le))
w := wait.NewTimeList()
// trigger committed index 2 to simulate the case that node has catched up to the latest CommittedIndex 2 returned by indexerMock
w.Trigger(2)
ts := httptest.NewServer(NewHandler(le, func() <-chan struct{} { return w.Wait(2) }))
defer ts.Close()

resp, err := TimeToLiveHTTP(context.TODO(), l.ID, true, ts.URL+LeaseInternalPrefix, http.DefaultTransport)
Expand All @@ -76,3 +85,42 @@ func TestTimeToLiveHTTP(t *testing.T) {
t.Fatalf("granted TTL expected 5, got %d", resp.LeaseTimeToLiveResponse.GrantedTTL)
}
}

func testTimeout(t *testing.T, f func(*lease.Lease, *httptest.Server) error) {
be, tmpPath := backend.NewTmpBackend(time.Hour, 10000)
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(be, int64(5))
w := wait.NewTimeList()
// don't trigger anything, make the RenewHTTP to time out
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}

ts := httptest.NewServer(NewHandler(le, func() <-chan struct{} { return w.Wait(2) }))
defer ts.Close()
err = f(l, ts)
if err == nil {
t.Fatalf("expected timeout error, got nil")
}
if strings.Compare(err.Error(), ErrLeaseHTTPTimeout.Error()) != 0 {
t.Fatalf("expected (%v), got (%v)", ErrLeaseHTTPTimeout.Error(), err.Error())
}
}

func TestRenewHTTPTimeout(t *testing.T) {
testTimeout(t, func(l *lease.Lease, ts *httptest.Server) error {
_, err := RenewHTTP(context.TODO(), l.ID, ts.URL+LeasePrefix, http.DefaultTransport)
return err
})
}

func TestTimeToLiveHTTPTimeout(t *testing.T) {
testTimeout(t, func(l *lease.Lease, ts *httptest.Server) error {
_, err := TimeToLiveHTTP(context.TODO(), l.ID, true, ts.URL+LeaseInternalPrefix, http.DefaultTransport)
return err
})
}

0 comments on commit d8849cb

Please sign in to comment.