Skip to content

Commit

Permalink
storage: rework AdminChangeReplicas for atomic membership changes
Browse files Browse the repository at this point in the history
Following the plan laid out in #39485, this adds API support for
general replication changes to `AdminChangeReplicasRequest`.

`(*DB).AtomicChangeReplicas` will now accept an arbitrary set
of additions/removals, though only on paper - the changes will
be executed individually.

In 19.2, production code will not use this request - it exists solely
for testing. The single user of atomic replication changes will be the
replicate queue, which has direct access to the replication change code
on the local replicas and thus does not need to use this request type.

The compatibility story is thus straightforward (this request is never
persisted): We simply populate both the deprecated and the new field (and
it isn't safe to emit "mixed" changes until all nodes run 19.2 and it's
not worth plumbing a setting around), and in 20.1 we remove the old
fields.  A maybe-snag is that as a result, there are a few months left
in this release in which folks may accidentally mix additions and
removals in a replica change without proper version gating. This wasn't
deemed very likely.

Release note: None
  • Loading branch information
tbg committed Aug 13, 2019
1 parent 74c2efe commit e0a09df
Show file tree
Hide file tree
Showing 14 changed files with 2,107 additions and 1,168 deletions.
353 changes: 321 additions & 32 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Large diffs are not rendered by default.

455 changes: 340 additions & 115 deletions c-deps/libroach/protos/roachpb/api.pb.h

Large diffs are not rendered by default.

11 changes: 4 additions & 7 deletions pkg/internal/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,10 +643,7 @@ func (b *Batch) adminTransferLease(key interface{}, target roachpb.StoreID) {
// adminChangeReplicas is only exported on DB. It is here for symmetry with the
// other operations.
func (b *Batch) adminChangeReplicas(
key interface{},
changeType roachpb.ReplicaChangeType,
targets []roachpb.ReplicationTarget,
expDesc roachpb.RangeDescriptor,
key interface{}, expDesc roachpb.RangeDescriptor, chgs []roachpb.ReplicationChange,
) {
k, err := marshalKey(key)
if err != nil {
Expand All @@ -657,10 +654,10 @@ func (b *Batch) adminChangeReplicas(
RequestHeader: roachpb.RequestHeader{
Key: k,
},
ChangeType: changeType,
Targets: targets,
ExpDesc: expDesc,
ExpDesc: expDesc,
}
req.AddChanges(chgs...)

b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
}
Expand Down
22 changes: 19 additions & 3 deletions pkg/internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,12 +525,28 @@ func (db *DB) AdminTransferLease(
func (db *DB) AdminChangeReplicas(
ctx context.Context,
key interface{},
changeType roachpb.ReplicaChangeType,
targets []roachpb.ReplicationTarget,
expDesc roachpb.RangeDescriptor,
chgs []roachpb.ReplicationChange,
) (*roachpb.RangeDescriptor, error) {
if ctx.Value("testing") == nil {
// Disallow trying to add and remove replicas in the same set of
// changes. This will only work when the node receiving the request is
// running 19.2 (code, not cluster version).
//
// TODO(tbg): remove this when 19.2 is released.
var typ *roachpb.ReplicaChangeType
for _, chg := range chgs {
if typ == nil {
curTyp := chg.ChangeType
typ = &curTyp
} else if *typ != chg.ChangeType {
return nil, errors.Errorf("can not mix %s and %s", *typ, chg.ChangeType)
}
}
}

b := &Batch{}
b.adminChangeReplicas(key, changeType, targets, expDesc)
b.adminChangeReplicas(key, expDesc, chgs)
if err := getOneErr(db.Run(ctx, b), b); err != nil {
return nil, err
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,3 +1185,46 @@ func (e *RangeFeedEvent) MustSetValue(value interface{}) {
panic(fmt.Sprintf("%T excludes %T", e, value))
}
}

// MakeReplicationChanges returns a slice of changes of the given type with an
// item for each target.
func MakeReplicationChanges(
changeType ReplicaChangeType, targets ...ReplicationTarget,
) []ReplicationChange {
chgs := make([]ReplicationChange, 0, len(targets))
for _, target := range targets {
chgs = append(chgs, ReplicationChange{
ChangeType: changeType,
Target: target,
})
}
return chgs
}

// AddChanges adds a batch of changes to the request in a backwards-compatible
// way.
func (r *AdminChangeReplicasRequest) AddChanges(chgs ...ReplicationChange) {
r.InternalChanges = append(r.InternalChanges, chgs...)

r.DeprecatedChangeType = chgs[0].ChangeType
for _, chg := range chgs {
r.DeprecatedTargets = append(r.DeprecatedTargets, chg.Target)
}
}

// Changes returns the changes requested by this AdminChangeReplicasRequest, taking
// the deprecated method of doing so into account.
func (r *AdminChangeReplicasRequest) Changes() []ReplicationChange {
if len(r.InternalChanges) > 0 {
return r.InternalChanges
}

sl := make([]ReplicationChange, len(r.DeprecatedTargets))
for _, target := range r.DeprecatedTargets {
sl = append(sl, ReplicationChange{
ChangeType: r.DeprecatedChangeType,
Target: target,
})
}
return sl
}
Loading

0 comments on commit e0a09df

Please sign in to comment.