Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Split membership sync endpoints #2773

Merged
merged 9 commits into from
Nov 25, 2018
7 changes: 7 additions & 0 deletions conn/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ func (p *proposals) Store(key string, pctx *ProposalCtx) bool {
return true
}

func (p *proposals) Ctx(key string) context.Context {
if pctx := p.Get(key); pctx != nil {
return pctx.Ctx
}
return context.Background()
}

func (p *proposals) Get(key string) *ProposalCtx {
p.RLock()
defer p.RUnlock()
Expand Down
15 changes: 12 additions & 3 deletions contrib/blockade/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ func testCommon(remove, join string, minAlphasUp int) error {
if err := increment(minAlphasUp); err != nil {
return err
}
// Then join.
// Then join, if available.
if len(join) == 0 {
continue
}
if err := run(ctxb, join); err != nil {
return err
}
Expand Down Expand Up @@ -155,10 +158,16 @@ func runTests() error {
fmt.Println("===> Slow TEST: OK")

if err := testCommon("blockade stop", "blockade start --all", 2); err != nil {
fmt.Printf("Error testRestart: %v\n", err)
fmt.Printf("Error testRestart with stop: %v\n", err)
return err
}
fmt.Println("===> Restart TEST1: OK")

if err := testCommon("blockade restart", "", 3); err != nil {
fmt.Printf("Error testRestart with restart: %v\n", err)
return err
}
fmt.Println("===> Restart TEST: OK")
fmt.Println("===> Restart TEST2: OK")

if err := testCommon("blockade partition", "blockade join", 2); err != nil {
fmt.Printf("Error testPartitions: %v\n", err)
Expand Down
42 changes: 3 additions & 39 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ func (o *Oracle) purgeBelow(minTs uint64) {
o.Lock()
defer o.Unlock()

// TODO: HACK. Remove this later.
glog.Infof("Not purging below: %d", minTs)
return

// Dropping would be cheaper if abort/commits map is sharded
for ts := range o.commits {
if ts < minTs {
Expand Down Expand Up @@ -203,7 +199,9 @@ func (o *Oracle) sendDeltasToSubscribers() {
// Don't goto slurp_loop, because it would break from select immediately.
}

glog.V(2).Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta)
if glog.V(3) {
glog.Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta)
}
o.Lock()
for id, ch := range o.subscribers {
select {
Expand Down Expand Up @@ -446,40 +444,6 @@ func (s *Server) SyncedUntil() uint64 {
return syncUntil
}

func (s *Server) purgeOracle() {
ticker := time.NewTicker(time.Second * 30)
defer ticker.Stop()

var lastPurgeTs uint64
OUTER:
for {
<-ticker.C
groups := s.KnownGroups()
var minTs uint64
for _, group := range groups {
pl := s.Leader(group)
if pl == nil {
glog.Errorf("No healthy connection found to leader of group %d\n", group)
goto OUTER
}
c := pb.NewWorkerClient(pl.Get())
num, err := c.PurgeTs(context.Background(), &api.Payload{})
if err != nil {
glog.Errorf("Error while fetching minTs from group %d, err: %v\n", group, err)
goto OUTER
}
if minTs == 0 || num.Val < minTs {
minTs = num.Val
}
}

if minTs > 0 && minTs != lastPurgeTs {
s.orc.purgeBelow(minTs)
lastPurgeTs = minTs
}
}
}

func (s *Server) TryAbort(ctx context.Context,
txns *pb.TxnTimestamps) (*pb.OracleDelta, error) {
delta := &pb.OracleDelta{}
Expand Down
Loading