Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Split membership sync endpoints #2773

Merged
merged 9 commits into from
Nov 25, 2018
42 changes: 3 additions & 39 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ func (o *Oracle) purgeBelow(minTs uint64) {
o.Lock()
defer o.Unlock()

// TODO: HACK. Remove this later.
glog.Infof("Not purging below: %d", minTs)
return

// Dropping would be cheaper if abort/commits map is sharded
for ts := range o.commits {
if ts < minTs {
Expand Down Expand Up @@ -203,7 +199,9 @@ func (o *Oracle) sendDeltasToSubscribers() {
// Don't goto slurp_loop, because it would break from select immediately.
}

glog.V(2).Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta)
if glog.V(3) {
glog.Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta)
}
o.Lock()
for id, ch := range o.subscribers {
select {
Expand Down Expand Up @@ -446,40 +444,6 @@ func (s *Server) SyncedUntil() uint64 {
return syncUntil
}

func (s *Server) purgeOracle() {
ticker := time.NewTicker(time.Second * 30)
defer ticker.Stop()

var lastPurgeTs uint64
OUTER:
for {
<-ticker.C
groups := s.KnownGroups()
var minTs uint64
for _, group := range groups {
pl := s.Leader(group)
if pl == nil {
glog.Errorf("No healthy connection found to leader of group %d\n", group)
goto OUTER
}
c := pb.NewWorkerClient(pl.Get())
num, err := c.PurgeTs(context.Background(), &api.Payload{})
if err != nil {
glog.Errorf("Error while fetching minTs from group %d, err: %v\n", group, err)
goto OUTER
}
if minTs == 0 || num.Val < minTs {
minTs = num.Val
}
}

if minTs > 0 && minTs != lastPurgeTs {
s.orc.purgeBelow(minTs)
lastPurgeTs = minTs
}
}
}

func (s *Server) TryAbort(ctx context.Context,
txns *pb.TxnTimestamps) (*pb.OracleDelta, error) {
delta := &pb.OracleDelta{}
Expand Down
213 changes: 122 additions & 91 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"log"
"math"
"time"

"google.golang.org/grpc"
Expand Down Expand Up @@ -142,6 +143,108 @@ func newGroup() *pb.Group {
}
}

func (n *node) handleMemberProposal(state *pb.MembershipState, member *pb.Member) error {
m := n.server.member(member.Addr)
// Ensures that different nodes don't have same address.
if m != nil && (m.Id != member.Id || m.GroupId != member.GroupId) {
return x.Errorf("Found another member %d with same address: %v", m.Id, m.Addr)
}
if member.GroupId == 0 {
state.Zeros[member.Id] = member
if member.Leader {
// Unset leader flag for other nodes, there can be only one
// leader at a time.
for _, m := range state.Zeros {
if m.Id != member.Id {
m.Leader = false
}
}
}
return nil
}
group := state.Groups[member.GroupId]
if group == nil {
group = newGroup()
state.Groups[member.GroupId] = group
}
m, has := group.Members[member.Id]
if member.AmDead {
if has {
delete(group.Members, member.Id)
state.Removed = append(state.Removed, m)
conn.Get().Remove(m.Addr)
}
// else already removed.
return nil
}
if !has && len(group.Members) >= n.server.NumReplicas {
// We shouldn't allow more members than the number of replicas.
return x.Errorf("Group reached replication level. Can't add another member: %+v", member)
}

// Create a connection to this server.
go conn.Get().Connect(member.Addr)

group.Members[member.Id] = member
// Increment nextGroup when we have enough replicas
if member.GroupId == n.server.nextGroup &&
len(group.Members) >= n.server.NumReplicas {
n.server.nextGroup++
}
if member.Leader {
// Unset leader flag for other nodes, there can be only one
// leader at a time.
for _, m := range group.Members {
if m.Id != member.Id {
m.Leader = false
}
}
}
// On replay of logs on restart we need to set nextGroup.
if n.server.nextGroup <= member.GroupId {
n.server.nextGroup = member.GroupId + 1
}
return nil
}

func (n *node) handleTabletProposal(state *pb.MembershipState, tablet *pb.Tablet) error {
if tablet.GroupId == 0 {
return x.Errorf("Tablet group id is zero: %+v", tablet)
}
group := state.Groups[tablet.GroupId]
if tablet.Remove {
glog.Infof("Removing tablet for attr: [%v], gid: [%v]\n", tablet.Predicate, tablet.GroupId)
if group != nil {
delete(group.Tablets, tablet.Predicate)
}
return nil
}
if group == nil {
group = newGroup()
state.Groups[tablet.GroupId] = group
}

// There's a edge case that we're handling.
// Two servers ask to serve the same tablet, then we need to ensure that
// only the first one succeeds.
if tablet := n.server.servingTablet(tablet.Predicate); tablet != nil {
if tablet.Force {
originalGroup := state.Groups[tablet.GroupId]
delete(originalGroup.Tablets, tablet.Predicate)
} else {
if tablet.GroupId != tablet.GroupId {
return x.Errorf(
"Tablet for attr: [%s], gid: [%d] already served by group: [%d]\n",
tablet.Predicate, tablet.GroupId, tablet.GroupId)
}
// This update can come from tablet size.
tablet.ReadOnly = tablet.ReadOnly
manishrjain marked this conversation as resolved.
Show resolved Hide resolved
}
}
group.Tablets[tablet.Predicate] = tablet
return nil
}

func (n *node) applyProposal(e raftpb.Entry) (string, error) {
var p pb.ZeroProposal
// Raft commits empty entry on becoming a leader.
Expand Down Expand Up @@ -172,103 +275,31 @@ func (n *node) applyProposal(e raftpb.Entry) (string, error) {
}
state.MaxRaftId = p.MaxRaftId
}
if p.Member != nil {
m := n.server.member(p.Member.Addr)
// Ensures that different nodes don't have same address.
if m != nil && (m.Id != p.Member.Id || m.GroupId != p.Member.GroupId) {
return p.Key, errInvalidAddress
}
if p.Member.GroupId == 0 {
state.Zeros[p.Member.Id] = p.Member
if p.Member.Leader {
// Unset leader flag for other nodes, there can be only one
// leader at a time.
for _, m := range state.Zeros {
if m.Id != p.Member.Id {
m.Leader = false
}
}
if p.SnapshotTs != nil {
for gid, ts := range p.SnapshotTs {
if group, ok := state.Groups[gid]; ok {
group.SnapshotTs = x.Max(group.SnapshotTs, ts)
}
return p.Key, nil
}
group := state.Groups[p.Member.GroupId]
if group == nil {
group = newGroup()
state.Groups[p.Member.GroupId] = group
purgeTs := uint64(math.MaxUint64)
for _, group := range state.Groups {
purgeTs = x.Min(purgeTs, group.SnapshotTs)
}
m, has := group.Members[p.Member.Id]
if p.Member.AmDead {
if has {
delete(group.Members, p.Member.Id)
state.Removed = append(state.Removed, m)
conn.Get().Remove(m.Addr)
}
// else already removed.
return p.Key, nil
if purgeTs < math.MaxUint64 {
n.server.orc.purgeBelow(purgeTs)
}
if !has && len(group.Members) >= n.server.NumReplicas {
// We shouldn't allow more members than the number of replicas.
return p.Key, errInvalidProposal
}

// Create a connection to this server.
go conn.Get().Connect(p.Member.Addr)

group.Members[p.Member.Id] = p.Member
// Increment nextGroup when we have enough replicas
if p.Member.GroupId == n.server.nextGroup &&
len(group.Members) >= n.server.NumReplicas {
n.server.nextGroup++
}
if p.Member.Leader {
// Unset leader flag for other nodes, there can be only one
// leader at a time.
for _, m := range group.Members {
if m.Id != p.Member.Id {
m.Leader = false
}
}
}
// On replay of logs on restart we need to set nextGroup.
if n.server.nextGroup <= p.Member.GroupId {
n.server.nextGroup = p.Member.GroupId + 1
}
if p.Member != nil {
if err := n.handleMemberProposal(state, p.Member); err != nil {
glog.Errorf("While applying membership proposal: %+v", err)
return p.Key, err
}
}
if p.Tablet != nil {
if p.Tablet.GroupId == 0 {
return p.Key, errInvalidProposal
}
group := state.Groups[p.Tablet.GroupId]
if p.Tablet.Remove {
glog.Infof("Removing tablet for attr: [%v], gid: [%v]\n", p.Tablet.Predicate, p.Tablet.GroupId)
if group != nil {
delete(group.Tablets, p.Tablet.Predicate)
}
return p.Key, nil
}
if group == nil {
group = newGroup()
state.Groups[p.Tablet.GroupId] = group
}

// There's a edge case that we're handling.
// Two servers ask to serve the same tablet, then we need to ensure that
// only the first one succeeds.
if tablet := n.server.servingTablet(p.Tablet.Predicate); tablet != nil {
if p.Tablet.Force {
originalGroup := state.Groups[tablet.GroupId]
delete(originalGroup.Tablets, p.Tablet.Predicate)
} else {
if tablet.GroupId != p.Tablet.GroupId {
glog.Infof("Tablet for attr: [%s], gid: [%d] is already being served by group: [%d]\n",
tablet.Predicate, p.Tablet.GroupId, tablet.GroupId)
return p.Key, errTabletAlreadyServed
}
// This update can come from tablet size.
p.Tablet.ReadOnly = tablet.ReadOnly
}
if err := n.handleTabletProposal(state, p.Tablet); err != nil {
glog.Errorf("While applying tablet proposal: %+v", err)
return p.Key, err
}
group.Tablets[p.Tablet.Predicate] = p.Tablet
}

if p.MaxLeaseId > state.MaxLeaseId {
Expand Down Expand Up @@ -359,12 +390,12 @@ func (n *node) initAndStartNode() error {
} else if len(opts.peer) > 0 {
p := conn.Get().Connect(opts.peer)
if p == nil {
return errInvalidAddress
return x.Errorf("Unhealthy connection to %v", opts.peer)
}

gconn := p.Get()
c := pb.NewRaftClient(gconn)
err := errJoinCluster
err := x.Errorf("Unable to join cluster")
timeout := 8 * time.Second
for i := 0; err != nil; i++ {
ctx, cancel := context.WithTimeout(n.ctx, timeout)
Expand Down
Loading