Skip to content

Commit

Permalink
Split membership sync endpoints and remove PurgeTs endpoint (dgraph-i…
Browse files Browse the repository at this point in the history
…o#2773)

- Now that Zero followers cannot send proposals to leader, Alphas must send their membership updates
  to the Zero leader.
- The membership updates can be received from any Zero server, leader or follower.
- So, this PR splits the job of sending and receiving updates into two different endpoints:
  StreamMembership and UpdateMembership.

- Purge Timestamps are now calculated using the Snapshot timestamp of the group. This is simpler,
  easier to predict and makes this system run like clockwork. That is, a snapshot calculation in
  Alpha leader would cause all Alpha followers to get that snapshot, which would then cause Zero to
  purge the timestamps below that snapshot ts (assuming other groups have caught up).
- This removes the need for Zero to query all Alpha leaders, via PurgeTs endpoint. That is now
  removed.

- Removed a lot of error variables, switching them with explanatory errors about what is going on in
  the system, making things easier to debug.

Commits:

* Working membership endpoint split.
* Removed PurgeTs endpoint. Passing snapshot ts during membership update, so Zero can use that information to purge Oracle.
* Block on Tablet call until we find a Zero leader.
* More debugging via traces.
* Simplify connToZeroLeader. Try to not add new bugs.
* Update blockade test with restart option.
* Self Reviews.
  • Loading branch information
manishrjain authored and dna2github committed Jul 19, 2019
1 parent 8d61a76 commit efca1f0
Show file tree
Hide file tree
Showing 12 changed files with 888 additions and 697 deletions.
7 changes: 7 additions & 0 deletions conn/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ func (p *proposals) Store(key string, pctx *ProposalCtx) bool {
return true
}

func (p *proposals) Ctx(key string) context.Context {
if pctx := p.Get(key); pctx != nil {
return pctx.Ctx
}
return context.Background()
}

func (p *proposals) Get(key string) *ProposalCtx {
p.RLock()
defer p.RUnlock()
Expand Down
15 changes: 12 additions & 3 deletions contrib/blockade/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ func testCommon(remove, join string, minAlphasUp int) error {
if err := increment(minAlphasUp); err != nil {
return err
}
// Then join.
// Then join, if available.
if len(join) == 0 {
continue
}
if err := run(ctxb, join); err != nil {
return err
}
Expand Down Expand Up @@ -155,10 +158,16 @@ func runTests() error {
fmt.Println("===> Slow TEST: OK")

if err := testCommon("blockade stop", "blockade start --all", 2); err != nil {
fmt.Printf("Error testRestart: %v\n", err)
fmt.Printf("Error testRestart with stop: %v\n", err)
return err
}
fmt.Println("===> Restart TEST1: OK")

if err := testCommon("blockade restart", "", 3); err != nil {
fmt.Printf("Error testRestart with restart: %v\n", err)
return err
}
fmt.Println("===> Restart TEST: OK")
fmt.Println("===> Restart TEST2: OK")

if err := testCommon("blockade partition", "blockade join", 2); err != nil {
fmt.Printf("Error testPartitions: %v\n", err)
Expand Down
42 changes: 3 additions & 39 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ func (o *Oracle) purgeBelow(minTs uint64) {
o.Lock()
defer o.Unlock()

// TODO: HACK. Remove this later.
glog.Infof("Not purging below: %d", minTs)
return

// Dropping would be cheaper if abort/commits map is sharded
for ts := range o.commits {
if ts < minTs {
Expand Down Expand Up @@ -203,7 +199,9 @@ func (o *Oracle) sendDeltasToSubscribers() {
// Don't goto slurp_loop, because it would break from select immediately.
}

glog.V(2).Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta)
if glog.V(3) {
glog.Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta)
}
o.Lock()
for id, ch := range o.subscribers {
select {
Expand Down Expand Up @@ -446,40 +444,6 @@ func (s *Server) SyncedUntil() uint64 {
return syncUntil
}

func (s *Server) purgeOracle() {
ticker := time.NewTicker(time.Second * 30)
defer ticker.Stop()

var lastPurgeTs uint64
OUTER:
for {
<-ticker.C
groups := s.KnownGroups()
var minTs uint64
for _, group := range groups {
pl := s.Leader(group)
if pl == nil {
glog.Errorf("No healthy connection found to leader of group %d\n", group)
goto OUTER
}
c := pb.NewWorkerClient(pl.Get())
num, err := c.PurgeTs(context.Background(), &api.Payload{})
if err != nil {
glog.Errorf("Error while fetching minTs from group %d, err: %v\n", group, err)
goto OUTER
}
if minTs == 0 || num.Val < minTs {
minTs = num.Val
}
}

if minTs > 0 && minTs != lastPurgeTs {
s.orc.purgeBelow(minTs)
lastPurgeTs = minTs
}
}
}

func (s *Server) TryAbort(ctx context.Context,
txns *pb.TxnTimestamps) (*pb.OracleDelta, error) {
delta := &pb.OracleDelta{}
Expand Down
Loading

0 comments on commit efca1f0

Please sign in to comment.