Skip to content

Commit

Permalink
etcdserver: support read index
Browse files Browse the repository at this point in the history
Use read index to achieve l-read.
  • Loading branch information
xiang90 committed Sep 26, 2016
1 parent f5b9238 commit fa8db4b
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 2 deletions.
8 changes: 7 additions & 1 deletion e2e/ctl_v3_elect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package e2e

import (
"fmt"
"os"
"strings"
"testing"
Expand All @@ -23,7 +24,12 @@ import (
"github.com/coreos/etcd/pkg/expect"
)

func TestCtlV3Elect(t *testing.T) { testCtl(t, testElect) }
func TestCtlV3Elect(t *testing.T) {
for i := 0; ; i++ {
fmt.Println(i)
testCtl(t, testElect)
}
}

func testElect(cx ctlCtx) {
name := "a"
Expand Down
11 changes: 11 additions & 0 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ type raftNode struct {
// a chan to send out apply
applyc chan apply

// a chan to send out readState
readStateC chan raft.ReadState

// TODO: remove the etcdserver related logic from raftNode
// TODO: add a state machine interface to apply the commit entries
// and do snapshot/recover
Expand Down Expand Up @@ -196,6 +199,14 @@ func (r *raftNode) start(s *EtcdServer) {
}
}

if len(rd.ReadStates) != 0 {
select {
case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
case <-r.stopped:
return
}
}

raftDone := make(chan struct{}, 1)
ap := apply{
entries: rd.CommittedEntries,
Expand Down
16 changes: 15 additions & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,22 @@ type EtcdServer struct {
snapCount uint64

w wait.Wait

readMu sync.RWMutex
// read routine notifies etcd server that it waits for reading by sending an empty struct to
// readwaitC
readwaitc chan struct{}
// readNotifier is used to notify the read routine that it can process the request
// when there is no error
readNotifier *notifier

// stop signals the run goroutine should shutdown.
stop chan struct{}
// stopping is closed by run goroutine on shutdown.
stopping chan struct{}
// done is closed when all goroutines from start() complete.
done chan struct{}
done chan struct{}

errorc chan error
id types.ID
attributes membership.Attributes
Expand Down Expand Up @@ -391,6 +401,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
raftStorage: s,
storage: NewStorage(w, ss),
readStateC: make(chan raft.ReadState, 1),
},
id: id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
Expand Down Expand Up @@ -478,6 +489,7 @@ func (s *EtcdServer) Start() {
s.goAttach(s.purgeFile)
s.goAttach(func() { monitorFileDescriptor(s.stopping) })
s.goAttach(s.monitorVersions)
s.goAttach(s.linearizableReadLoop)
}

// start prepares and starts server in a new goroutine. It is no longer safe to
Expand All @@ -493,6 +505,8 @@ func (s *EtcdServer) start() {
s.done = make(chan struct{})
s.stop = make(chan struct{})
s.stopping = make(chan struct{})
s.readwaitc = make(chan struct{}, 1)
s.readNotifier = newNotifier()
if s.ClusterVersion() != nil {
plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String()))
} else {
Expand Down
16 changes: 16 additions & 0 deletions etcdserver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,19 @@ func longestConnected(tp rafthttp.Transporter, membs []types.ID) (types.ID, bool
}
return longest, true
}

type notifier struct {
c chan struct{}
err error
}

func newNotifier() *notifier {
return &notifier{
c: make(chan struct{}, 0),
}
}

func (nc *notifier) notify(err error) {
nc.err = err
close(nc.c)
}
127 changes: 127 additions & 0 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package etcdserver

import (
"bytes"
"encoding/binary"
"strconv"
"strings"
"time"
Expand All @@ -26,6 +28,9 @@ import (
"github.com/coreos/etcd/lease/leasehttp"
"github.com/coreos/etcd/lease/leasepb"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/raft"

"github.com/coreos/go-semver/semver"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
Expand All @@ -44,6 +49,10 @@ const (
maxGapBetweenApplyAndCommitIndex = 1000
)

var (
newRangeClusterVersion = *semver.Must(semver.NewVersion("3.1.0"))
)

type RaftKV interface {
Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
Expand Down Expand Up @@ -86,6 +95,31 @@ type Authenticator interface {
}

func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
// TODO: remove this checking when we release etcd 3.2
if s.ClusterVersion() == nil || s.ClusterVersion().LessThan(newRangeClusterVersion) {
return s.legacyRange(ctx, r)
}

if !r.Serializable {
err := s.linearizableReadNotify(ctx)
if err != nil {
return nil, err
}
}
var resp *pb.RangeResponse
var err error
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
get := func() { resp, err = s.applyV3Base.Range(noTxn, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
}
return resp, err
}

// TODO: remove this func when we release etcd 3.2
func (s *EtcdServer) legacyRange(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
if r.Serializable {
var resp *pb.RangeResponse
var err error
Expand Down Expand Up @@ -641,3 +675,96 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern

// Watchable returns a watchable interface attached to the etcdserver.
func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }

func (s *EtcdServer) linearizableReadLoop() {
var rs raft.ReadState
internalTimeout := time.Second

for {
ctx := make([]byte, 8)
binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next())

select {
case <-s.readwaitc:
case <-s.stopping:
return
}

nextnr := newNotifier()

s.readMu.Lock()
nr := s.readNotifier
s.readNotifier = nextnr
s.readMu.Unlock()

cctx, cancel := context.WithTimeout(context.Background(), internalTimeout)
if err := s.r.ReadIndex(cctx, ctx); err != nil {
cancel()
if err == raft.ErrStopped {
return
}
plog.Errorf("failed to get read index from raft: %v", err)
nr.notify(err)
continue
}
cancel()

var (
timeout bool
done bool
)
for !timeout && !done {
select {
case rs = <-s.r.readStateC:
if !bytes.Equal(rs.RequestCtx, ctx) {
// a previous request might time out. now we should ignore the response of it and
// continue waiting for the response of the current requests.
plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx)
} else {
done = true
}
case <-time.After(internalTimeout):
plog.Warningf("timed out waiting for read index response")
nr.notify(ErrTimeout)
timeout = true
case <-s.stopping:
return
}
}
if !done {
continue
}

if ai := s.getAppliedIndex(); ai < rs.Index {
select {
case <-s.applyWait.Wait(rs.Index):
case <-s.stopping:
return
}
}
// unblock all l-reads requested at indices before rs.Index
nr.notify(nil)
}
}

func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
s.readMu.RLock()
nc := s.readNotifier
s.readMu.RUnlock()

// signal linearizable loop for current notify if it hasn't been already
select {
case s.readwaitc <- struct{}{}:
default:
}

// wait for read state notification
select {
case <-nc.c:
return nc.err
case <-ctx.Done():
return ctx.Err()
case <-s.done:
return ErrStopped
}
}

0 comments on commit fa8db4b

Please sign in to comment.