Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: rework AdminChangeReplicas for atomic membership changes #39611

Merged
merged 1 commit into from
Aug 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
353 changes: 321 additions & 32 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Large diffs are not rendered by default.

455 changes: 340 additions & 115 deletions c-deps/libroach/protos/roachpb/api.pb.h

Large diffs are not rendered by default.

11 changes: 4 additions & 7 deletions pkg/internal/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,10 +643,7 @@ func (b *Batch) adminTransferLease(key interface{}, target roachpb.StoreID) {
// adminChangeReplicas is only exported on DB. It is here for symmetry with the
// other operations.
func (b *Batch) adminChangeReplicas(
key interface{},
changeType roachpb.ReplicaChangeType,
targets []roachpb.ReplicationTarget,
expDesc roachpb.RangeDescriptor,
key interface{}, expDesc roachpb.RangeDescriptor, chgs []roachpb.ReplicationChange,
) {
k, err := marshalKey(key)
if err != nil {
Expand All @@ -657,10 +654,10 @@ func (b *Batch) adminChangeReplicas(
RequestHeader: roachpb.RequestHeader{
Key: k,
},
ChangeType: changeType,
Targets: targets,
ExpDesc: expDesc,
ExpDesc: expDesc,
}
req.AddChanges(chgs...)

b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}
Expand Down
22 changes: 19 additions & 3 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,12 +525,28 @@ func (db *DB) AdminTransferLease(
func (db *DB) AdminChangeReplicas(
ctx context.Context,
key interface{},
changeType roachpb.ReplicaChangeType,
targets []roachpb.ReplicationTarget,
expDesc roachpb.RangeDescriptor,
chgs []roachpb.ReplicationChange,
) (*roachpb.RangeDescriptor, error) {
if ctx.Value("testing") == nil {
// Disallow trying to add and remove replicas in the same set of
// changes. This will only work when the node receiving the request is
// running 19.2 (code, not cluster version).
//
// TODO(tbg): remove this when 19.2 is released.
var typ *roachpb.ReplicaChangeType
for i := range chgs {
chg := &chgs[i]
if typ == nil {
typ = &chg.ChangeType
} else if *typ != chg.ChangeType {
return nil, errors.Errorf("can not mix %s and %s", *typ, chg.ChangeType)
}
}
}

b := &Batch{}
b.adminChangeReplicas(key, changeType, targets, expDesc)
b.adminChangeReplicas(key, expDesc, chgs)
if err := getOneErr(db.Run(ctx, b), b); err != nil {
return nil, err
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,3 +1185,46 @@ func (e *RangeFeedEvent) MustSetValue(value interface{}) {
panic(fmt.Sprintf("%T excludes %T", e, value))
}
}

// MakeReplicationChanges returns a slice of changes of the given type with an
// item for each target.
func MakeReplicationChanges(
changeType ReplicaChangeType, targets ...ReplicationTarget,
) []ReplicationChange {
chgs := make([]ReplicationChange, 0, len(targets))
for _, target := range targets {
chgs = append(chgs, ReplicationChange{
ChangeType: changeType,
Target: target,
})
}
return chgs
}

// AddChanges adds a batch of changes to the request in a backwards-compatible
// way.
func (acrr *AdminChangeReplicasRequest) AddChanges(chgs ...ReplicationChange) {
acrr.InternalChanges = append(acrr.InternalChanges, chgs...)

acrr.DeprecatedChangeType = chgs[0].ChangeType
for _, chg := range chgs {
acrr.DeprecatedTargets = append(acrr.DeprecatedTargets, chg.Target)
}
}

// Changes returns the changes requested by this AdminChangeReplicasRequest, taking
// the deprecated method of doing so into account.
func (acrr *AdminChangeReplicasRequest) Changes() []ReplicationChange {
if len(acrr.InternalChanges) > 0 {
return acrr.InternalChanges
}

sl := make([]ReplicationChange, len(acrr.DeprecatedTargets))
for _, target := range acrr.DeprecatedTargets {
sl = append(sl, ReplicationChange{
ChangeType: acrr.DeprecatedChangeType,
Target: target,
})
}
return sl
}
Loading