-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
replica_destroy.go
254 lines (225 loc) · 8.25 KB
/
replica_destroy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package storage
import (
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
// DestroyReason indicates if a replica is alive, destroyed, corrupted or pending destruction.
type DestroyReason int
const (
// The replica is alive.
destroyReasonAlive DestroyReason = iota
// The replica is in the process of being removed but has not been removed
// yet. It exists to avoid races between two threads which may decide to
// destroy a replica (e.g. processing a ChangeReplicasTrigger removing the
// range and receiving a raft message with a higher replica ID).
destroyReasonRemovalPending
// The replica has been GCed.
destroyReasonRemoved
// The replica has been merged into its left-hand neighbor, but its left-hand
// neighbor hasn't yet subsumed it.
destroyReasonMergePending
)
type destroyStatus struct {
reason DestroyReason
err error
}
func (s destroyStatus) String() string {
return fmt.Sprintf("{%v %d}", s.err, s.reason)
}
func (s *destroyStatus) Set(err error, reason DestroyReason) {
s.err = err
s.reason = reason
}
// IsAlive returns true when a replica is alive.
func (s destroyStatus) IsAlive() bool {
return s.reason == destroyReasonAlive
}
// Removed returns whether the replica has been removed.
func (s destroyStatus) Removed() bool {
return s.reason == destroyReasonRemoved
}
// RemovalPending returns whether the replica is removed or in the process of
// being removed.
func (s destroyStatus) RemovalPending() bool {
return s.reason == destroyReasonRemovalPending || s.reason == destroyReasonRemoved
}
func (r *Replica) preDestroyRaftMuLocked(
ctx context.Context,
reader engine.Reader,
writer engine.Writer,
nextReplicaID roachpb.ReplicaID,
clearOpt clearRangeOption,
mustClearRange bool,
) error {
desc := r.Desc()
err := clearRangeData(desc, reader, writer, clearOpt, mustClearRange)
if err != nil {
return err
}
// Save a tombstone to ensure that replica IDs never get reused.
//
// NB: Legacy tombstones (which are in the replicated key space) are wiped
// in clearRangeData, but that's OK since we're writing a new one in the same
// batch (and in particular, sequenced *after* the wipe).
return r.setTombstoneKey(ctx, writer, nextReplicaID)
}
func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCStats) error {
// Suggest the cleared range to the compactor queue.
//
// TODO(benesch): we would ideally atomically suggest the compaction with
// the deletion of the data itself.
desc := r.Desc()
r.store.compactor.Suggest(ctx, storagepb.SuggestedCompaction{
StartKey: roachpb.Key(desc.StartKey),
EndKey: roachpb.Key(desc.EndKey),
Compaction: storagepb.Compaction{
Bytes: ms.Total(),
SuggestedAtNanos: timeutil.Now().UnixNano(),
},
})
// NB: we need the nil check below because it's possible that we're GC'ing a
// Replica without a replicaID, in which case it does not have a sideloaded
// storage.
//
// TODO(tschottdorf): at node startup, we should remove all on-disk
// directories belonging to replicas which aren't present. A crash before a
// call to postDestroyRaftMuLocked will currently leave the files around
// forever.
if r.raftMu.sideloaded != nil {
return r.raftMu.sideloaded.Clear(ctx)
}
return nil
}
// removeUninitializedReplica is called when we know that an uninitialized
// replica has been removed and re-added as a different replica. We're safe
// to GC its hard state because nobody cares about our votes anymore. The
// sad thing is we aren't safe to GC the range's data because we don't know
// where it is. In most cases we'll either get a snapshot or we'll find out
// that this uninitialized replica had been part of a split and we can at
// least clear that split data. In general we shouldn't have any except in
// that split case so it should be okay.
func (r *Replica) destroyUninitializedReplicaRaftMuLocked(
ctx context.Context, nextReplicaID roachpb.ReplicaID,
) {
batch := r.Engine().NewWriteOnlyBatch()
defer batch.Close()
if err := r.preDestroyRaftMuLocked(
ctx,
r.Engine(),
batch,
nextReplicaID,
clearRangeIDLocalOnly,
false, /* mustClearRange */
); err != nil {
log.Fatal(ctx, err)
}
// We need to sync here because we are potentially deleting sideloaded
// proposals from the file system next. We could write the tombstone only in
// a synchronous batch first and then delete the data alternatively, but
// then need to handle the case in which there is both the tombstone and
// leftover replica data.
if err := batch.Commit(true); err != nil {
log.Fatal(ctx, err)
}
if r.raftMu.sideloaded != nil {
if err := r.raftMu.sideloaded.Clear(ctx); err != nil {
log.Warningf(ctx, "failed to remove sideload storage for %v: %v", r, err)
}
}
}
// destroyRaftMuLocked deletes data associated with a replica, leaving a
// tombstone.
func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb.ReplicaID) error {
startTime := timeutil.Now()
ms := r.GetMVCCStats()
batch := r.Engine().NewWriteOnlyBatch()
defer batch.Close()
if err := r.preDestroyRaftMuLocked(
ctx,
r.Engine(),
batch,
nextReplicaID,
clearAll,
false, /* mustClearRange */
); err != nil {
return err
}
preTime := timeutil.Now()
// We need to sync here because we are potentially deleting sideloaded
// proposals from the file system next. We could write the tombstone only in
// a synchronous batch first and then delete the data alternatively, but
// then need to handle the case in which there is both the tombstone and
// leftover replica data.
if err := batch.Commit(true); err != nil {
return err
}
commitTime := timeutil.Now()
if err := r.postDestroyRaftMuLocked(ctx, ms); err != nil {
return err
}
log.Infof(ctx, "removed %d (%d+%d) keys in %0.0fms [clear=%0.0fms commit=%0.0fms]",
ms.KeyCount+ms.SysCount, ms.KeyCount, ms.SysCount,
commitTime.Sub(startTime).Seconds()*1000,
preTime.Sub(startTime).Seconds()*1000,
commitTime.Sub(preTime).Seconds()*1000)
return nil
}
// cancelPendingCommandsLocked cancels all outstanding proposals.
// It requires that both mu and raftMu are held.
func (r *Replica) cancelPendingCommandsLocked() {
r.raftMu.AssertHeld()
r.mu.AssertHeld()
r.mu.proposalBuf.FlushLockedWithoutProposing()
for _, p := range r.mu.proposals {
r.cleanupFailedProposalLocked(p)
// NB: each proposal needs its own version of the error (i.e. don't try to
// share the error across proposals).
p.finishApplication(proposalResult{
Err: roachpb.NewError(roachpb.NewAmbiguousResultError("removing replica")),
})
}
}
// setTombstoneKey writes a tombstone to disk to ensure that replica IDs never
// get reused. It determines what the minimum next replica ID can be using
// the provided nextReplicaID and the Replica's own ID.
//
// We have to be careful to set the right key, since a replica can be using an
// ID that it hasn't yet received a RangeDescriptor for if it receives raft
// requests for that replica ID (as seen in #14231).
func (r *Replica) setTombstoneKey(
ctx context.Context, eng engine.Writer, externalNextReplicaID roachpb.ReplicaID,
) error {
r.mu.Lock()
nextReplicaID := r.mu.state.Desc.NextReplicaID
if nextReplicaID < externalNextReplicaID {
nextReplicaID = externalNextReplicaID
}
if nextReplicaID > r.mu.minReplicaID {
r.mu.minReplicaID = nextReplicaID
}
r.mu.Unlock()
tombstoneKey := keys.RaftTombstoneKey(r.RangeID)
tombstone := &roachpb.RaftTombstone{
NextReplicaID: nextReplicaID,
}
// "Blind" because ms == nil and timestamp == hlc.Timestamp{}.
return engine.MVCCBlindPutProto(ctx, eng, nil, tombstoneKey,
hlc.Timestamp{}, tombstone, nil)
}