Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lease: force leader to apply its pending committed index for lease op… #7015

Merged
merged 3 commits into from
Dec 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions etcdserver/api/v2http/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ const (
// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(s *etcdserver.EtcdServer) http.Handler {
var lh http.Handler
if l := s.Lessor(); l != nil {
lh = leasehttp.NewHandler(l)
l := s.Lessor()
if l != nil {
lh = leasehttp.NewHandler(l, func() <-chan struct{} { return s.ApplyWait() })
}
return newPeerHandler(s.Cluster(), s.RaftHandler(), lh)
}
Expand Down
15 changes: 15 additions & 0 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ func (r *raftNode) start(rh *raftReadyHandler) {
raftDone: raftDone,
}

updateCommittedIndex(&ap, rh)

select {
case r.applyc <- ap:
case <-r.stopped:
Expand Down Expand Up @@ -231,6 +233,19 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}()
}

func updateCommittedIndex(ap *apply, rh *raftReadyHandler) {
var ci uint64
if len(ap.entries) != 0 {
ci = ap.entries[len(ap.entries)-1].Index
}
if ap.snapshot.Metadata.Index > ci {
ci = ap.snapshot.Metadata.Index
}
if ci != 0 {
rh.updateCommittedIndex(ci)
}
}

func (r *raftNode) sendMessages(ms []raftpb.Message) {
sentAppResp := false
for i := len(ms) - 1; i >= 0; i-- {
Expand Down
21 changes: 10 additions & 11 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,8 @@ func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler()

func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor }

func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) }

func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if s.cluster.IsIDRemoved(types.ID(m.From)) {
plog.Warningf("reject message from removed member %s", types.ID(m.From).String())
Expand Down Expand Up @@ -596,7 +598,8 @@ type etcdProgress struct {
// and helps decouple state machine logic from Raft algorithms.
// TODO: add a state machine interface to apply the commit entries and do snapshot/recover
type raftReadyHandler struct {
leadershipUpdate func()
leadershipUpdate func()
updateCommittedIndex func(uint64)
}

func (s *EtcdServer) run() {
Expand Down Expand Up @@ -646,6 +649,12 @@ func (s *EtcdServer) run() {
s.r.td.Reset()
}
},
updateCommittedIndex: func(ci uint64) {
cci := s.getCommittedIndex()
if ci > cci {
s.setCommittedIndex(ci)
}
},
}
s.r.start(rh)

Expand Down Expand Up @@ -699,16 +708,6 @@ func (s *EtcdServer) run() {
for {
select {
case ap := <-s.r.apply():
var ci uint64
if len(ap.entries) != 0 {
ci = ap.entries[len(ap.entries)-1].Index
}
if ap.snapshot.Metadata.Index > ci {
ci = ap.snapshot.Metadata.Index
}
if ci != 0 {
s.setCommittedIndex(ci)
}
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
case leases := <-expiredLeaseC:
Expand Down
85 changes: 85 additions & 0 deletions integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,91 @@ func TestV3LeaseExists(t *testing.T) {
}
}

// TestV3LeaseRenewStress keeps creating lease and renewing it immediately to ensure the renewal goes through.
// it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found.
// related issue https://github.com/coreos/etcd/issues/6978
func TestV3LeaseRenewStress(t *testing.T) {
testLeaseStress(t, stressLeaseRenew)
}

// TestV3LeaseTimeToLiveStress keeps creating lease and retriving it immediately to ensure the lease can be retrived.
// it was oberserved that the immediate lease retrival after granting a lease from follower resulted lease not found.
// related issue https://github.com/coreos/etcd/issues/6978
func TestV3LeaseTimeToLiveStress(t *testing.T) {
testLeaseStress(t, stressLeaseTimeToLive)
}

func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
errc := make(chan error)

for i := 0; i < 30; i++ {
for j := 0; j < 3; j++ {
go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j)
}
}

for i := 0; i < 90; i++ {
if err := <-errc; err != nil {
t.Fatal(err)
}
}
}

func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) {
defer func() {
if tctx.Err() != nil {
reterr = nil
}
}()
lac, err := lc.LeaseKeepAlive(tctx)
if err != nil {
return err
}
for tctx.Err() == nil {
resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60})
if gerr != nil {
continue
}
err = lac.Send(&pb.LeaseKeepAliveRequest{ID: resp.ID})
if err != nil {
continue
}
rresp, rxerr := lac.Recv()
if rxerr != nil {
continue
}
if rresp.TTL == 0 {
return fmt.Errorf("TTL shouldn't be 0 so soon")
}
}
return nil
}

func stressLeaseTimeToLive(tctx context.Context, lc pb.LeaseClient) (reterr error) {
defer func() {
if tctx.Err() != nil {
reterr = nil
}
}()
for tctx.Err() == nil {
resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60})
if gerr != nil {
continue
}
_, kerr := lc.LeaseTimeToLive(tctx, &pb.LeaseTimeToLiveRequest{ID: resp.ID})
if rpctypes.Error(kerr) == rpctypes.ErrLeaseNotFound {
return kerr
}
}
return nil
}

func TestV3PutOnNonExistLease(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
Expand Down
37 changes: 32 additions & 5 deletions lease/leasehttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package leasehttp

import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"net/http"
"time"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease"
Expand All @@ -30,14 +32,19 @@ import (
var (
LeasePrefix = "/leases"
LeaseInternalPrefix = "/leases/internal"
applyTimeout = time.Second
ErrLeaseHTTPTimeout = errors.New("waiting for node to catch up its applied index has timed out")
)

// NewHandler returns an http Handler for lease renewals
func NewHandler(l lease.Lessor) http.Handler {
return &leaseHandler{l}
func NewHandler(l lease.Lessor, waitch func() <-chan struct{}) http.Handler {
return &leaseHandler{l, waitch}
}

type leaseHandler struct{ l lease.Lessor }
type leaseHandler struct {
l lease.Lessor
waitch func() <-chan struct{}
}

func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
Expand All @@ -59,6 +66,12 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
return
}
select {
case <-h.waitch():
case <-time.After(applyTimeout):
http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
return
}
ttl, err := h.l.Renew(lease.LeaseID(lreq.ID))
if err != nil {
if err == lease.ErrLeaseNotFound {
Expand All @@ -83,7 +96,12 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, "error unmarshalling request", http.StatusBadRequest)
return
}

select {
case <-h.waitch():
case <-time.After(applyTimeout):
http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
return
}
l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
if l == nil {
http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
Expand Down Expand Up @@ -148,6 +166,10 @@ func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundT
return -1, err
}

if resp.StatusCode == http.StatusRequestTimeout {
return -1, ErrLeaseHTTPTimeout
}

if resp.StatusCode == http.StatusNotFound {
return -1, lease.ErrLeaseNotFound
}
Expand Down Expand Up @@ -184,7 +206,8 @@ func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string

cc := &http.Client{Transport: rt}
var b []byte
errc := make(chan error)
// buffer errc channel so that errc don't block inside the go routinue
errc := make(chan error, 2)
go func() {
resp, err := cc.Do(req)
if err != nil {
Expand All @@ -196,6 +219,10 @@ func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string
errc <- err
return
}
if resp.StatusCode == http.StatusRequestTimeout {
errc <- ErrLeaseHTTPTimeout
return
}
if resp.StatusCode == http.StatusNotFound {
errc <- lease.ErrLeaseNotFound
return
Expand Down
53 changes: 51 additions & 2 deletions lease/leasehttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import (
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"
"time"

"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"

"golang.org/x/net/context"
)

Expand All @@ -38,7 +40,7 @@ func TestRenewHTTP(t *testing.T) {
t.Fatalf("failed to create lease: %v", err)
}

ts := httptest.NewServer(NewHandler(le))
ts := httptest.NewServer(NewHandler(le, waitReady))
defer ts.Close()

ttl, err := RenewHTTP(context.TODO(), l.ID, ts.URL+LeasePrefix, http.DefaultTransport)
Expand All @@ -62,7 +64,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
t.Fatalf("failed to create lease: %v", err)
}

ts := httptest.NewServer(NewHandler(le))
ts := httptest.NewServer(NewHandler(le, waitReady))
defer ts.Close()

resp, err := TimeToLiveHTTP(context.TODO(), l.ID, true, ts.URL+LeaseInternalPrefix, http.DefaultTransport)
Expand All @@ -76,3 +78,50 @@ func TestTimeToLiveHTTP(t *testing.T) {
t.Fatalf("granted TTL expected 5, got %d", resp.LeaseTimeToLiveResponse.GrantedTTL)
}
}

func TestRenewHTTPTimeout(t *testing.T) {
testApplyTimeout(t, func(l *lease.Lease, serverURL string) error {
_, err := RenewHTTP(context.TODO(), l.ID, serverURL+LeasePrefix, http.DefaultTransport)
return err
})
}

func TestTimeToLiveHTTPTimeout(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very similar to TestRenewHTTPTimeout both should probably call into a common function that has Renew/TimeToLive passed in

testApplyTimeout(t, func(l *lease.Lease, serverURL string) error {
_, err := TimeToLiveHTTP(context.TODO(), l.ID, true, serverURL+LeaseInternalPrefix, http.DefaultTransport)
return err
})
}

func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
be, tmpPath := backend.NewTmpBackend(time.Hour, 10000)
defer os.Remove(tmpPath)
defer be.Close()

le := lease.NewLessor(be, int64(5))
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}

ts := httptest.NewServer(NewHandler(le, waitNotReady))
defer ts.Close()
err = f(l, ts.URL)
if err == nil {
t.Fatalf("expected timeout error, got nil")
}
if strings.Compare(err.Error(), ErrLeaseHTTPTimeout.Error()) != 0 {
t.Fatalf("expected (%v), got (%v)", ErrLeaseHTTPTimeout.Error(), err.Error())
}
}

func waitReady() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}

func waitNotReady() <-chan struct{} {
return nil
}