forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 2
/
OngoingSegmentReplications.java
261 lines (240 loc) · 10.9 KB
/
OngoingSegmentReplications.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.FileChunkWriter;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.CopyState;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* Manages references to ongoing segrep events on a node.
* Each replica will have a new {@link SegmentReplicationSourceHandler} created when starting replication.
* CopyStates will be cached for reuse between replicas and only released when all replicas have finished copying segments.
*
* @opensearch.internal
*/
class OngoingSegmentReplications {
private final RecoverySettings recoverySettings;
private final IndicesService indicesService;
private final Map<ReplicationCheckpoint, CopyState> copyStateMap;
private final Map<String, SegmentReplicationSourceHandler> allocationIdToHandlers;
/**
* Constructor.
*
* @param indicesService {@link IndicesService}
* @param recoverySettings {@link RecoverySettings}
*/
OngoingSegmentReplications(IndicesService indicesService, RecoverySettings recoverySettings) {
this.indicesService = indicesService;
this.recoverySettings = recoverySettings;
this.copyStateMap = Collections.synchronizedMap(new HashMap<>());
this.allocationIdToHandlers = ConcurrentCollections.newConcurrentMap();
}
/**
* Operations on the {@link #copyStateMap} member.
*/
/**
* A synchronized method that checks {@link #copyStateMap} for the given {@link ReplicationCheckpoint} key
* and returns the cached value if one is present. If the key is not present, a {@link CopyState}
* object is constructed and stored in the map before being returned.
*/
synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) throws IOException {
if (isInCopyStateMap(checkpoint)) {
final CopyState copyState = fetchFromCopyStateMap(checkpoint);
// we incref the copyState for every replica that is using this checkpoint.
// decref will happen when copy completes.
copyState.incRef();
return copyState;
} else {
// From the checkpoint's shard ID, fetch the IndexShard
ShardId shardId = checkpoint.getShardId();
final IndexService indexService = indicesService.indexService(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.id());
// build the CopyState object and cache it before returning
final CopyState copyState = new CopyState(checkpoint, indexShard);
/**
* Use the checkpoint from the request as the key in the map, rather than
* the checkpoint from the created CopyState. This maximizes cache hits
* if replication targets make a request with an older checkpoint.
* Replication targets are expected to fetch the checkpoint in the response
* CopyState to bring themselves up to date.
*/
addToCopyStateMap(checkpoint, copyState);
return copyState;
}
}
/**
* Start sending files to the replica.
*
* @param request {@link GetSegmentFilesRequest}
* @param listener {@link ActionListener} that resolves when sending files is complete.
*/
void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentFilesResponse> listener) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.get(request.getTargetAllocationId());
if (handler != null) {
if (handler.isReplicating()) {
throw new OpenSearchException(
"Replication to shard {}, on node {} has already started",
request.getCheckpoint().getShardId(),
request.getTargetNode()
);
}
// update the given listener to release the CopyState before it resolves.
final ActionListener<GetSegmentFilesResponse> wrappedListener = ActionListener.runBefore(listener, () -> {
final SegmentReplicationSourceHandler sourceHandler = allocationIdToHandlers.remove(request.getTargetAllocationId());
if (sourceHandler != null) {
removeCopyState(sourceHandler.getCopyState());
}
});
if (request.getFilesToFetch().isEmpty()) {
wrappedListener.onResponse(new GetSegmentFilesResponse(Collections.emptyList()));
} else {
handler.sendFiles(request, wrappedListener);
}
} else {
listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList()));
}
}
/**
* Prepare for a Replication event. This method constructs a {@link CopyState} holding files to be sent off of the current
* node's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its
* local store. It will then build a handler to orchestrate the segment copy that will be stored locally and started on a subsequent request from replicas
* with the list of required files.
*
* @param request {@link CheckpointInfoRequest}
* @param fileChunkWriter {@link FileChunkWriter} writer to handle sending files over the transport layer.
* @return {@link CopyState} the built CopyState for this replication event.
* @throws IOException - When there is an IO error building CopyState.
*/
CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
if (allocationIdToHandlers.putIfAbsent(
request.getTargetAllocationId(),
createTargetHandler(request.getTargetNode(), copyState, request.getTargetAllocationId(), fileChunkWriter)
) != null) {
throw new OpenSearchException(
"Shard copy {} on node {} already replicating",
request.getCheckpoint().getShardId(),
request.getTargetNode()
);
}
return copyState;
}
/**
* Cancel all Replication events for the given shard, intended to be called when a primary is shutting down.
*
* @param shard {@link IndexShard}
* @param reason {@link String} - Reason for the cancel
*/
synchronized void cancel(IndexShard shard, String reason) {
cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason);
}
/**
* Cancel all Replication events for the given allocation ID, intended to be called when a primary is shutting down.
*
* @param allocationId {@link String} - Allocation ID.
* @param reason {@link String} - Reason for the cancel
*/
synchronized void cancel(String allocationId, String reason) {
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId);
if (handler != null) {
handler.cancel(reason);
removeCopyState(handler.getCopyState());
}
}
/**
* Cancel any ongoing replications for a given {@link DiscoveryNode}
*
* @param node {@link DiscoveryNode} node for which to cancel replication events.
*/
void cancelReplication(DiscoveryNode node) {
cancelHandlers(handler -> handler.getTargetNode().equals(node), "Node left");
}
/**
* Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint}
* as a key by invoking {@link Map#containsKey(Object)}.
*/
boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) {
return copyStateMap.containsKey(replicationCheckpoint);
}
int size() {
return allocationIdToHandlers.size();
}
int cachedCopyStateSize() {
return copyStateMap.size();
}
private SegmentReplicationSourceHandler createTargetHandler(
DiscoveryNode node,
CopyState copyState,
String allocationId,
FileChunkWriter fileChunkWriter
) {
return new SegmentReplicationSourceHandler(
node,
fileChunkWriter,
copyState.getShard().getThreadPool(),
copyState,
allocationId,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
recoverySettings.getMaxConcurrentFileChunks()
);
}
/**
* Adds the input {@link CopyState} object to {@link #copyStateMap}.
* The key is the CopyState's {@link ReplicationCheckpoint} object.
*/
private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) {
copyStateMap.putIfAbsent(checkpoint, copyState);
}
/**
* Given a {@link ReplicationCheckpoint}, return the corresponding
* {@link CopyState} object, if any, from {@link #copyStateMap}.
*/
private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) {
return copyStateMap.get(replicationCheckpoint);
}
/**
* Remove a CopyState. Intended to be called after a replication event completes.
* This method will remove a copyState from the copyStateMap only if its refCount hits 0.
*
* @param copyState {@link CopyState}
*/
private synchronized void removeCopyState(CopyState copyState) {
if (copyState.decRef() == true) {
copyStateMap.remove(copyState.getRequestedReplicationCheckpoint());
}
}
/**
* Remove handlers from allocationIdToHandlers map based on a filter predicate.
* This will also decref the handler's CopyState reference.
*/
private void cancelHandlers(Predicate<? super SegmentReplicationSourceHandler> predicate, String reason) {
final List<String> allocationIds = allocationIdToHandlers.values()
.stream()
.filter(predicate)
.map(SegmentReplicationSourceHandler::getAllocationId)
.collect(Collectors.toList());
for (String allocationId : allocationIds) {
cancel(allocationId, reason);
}
}
}