Skip to content

Commit

Permalink
[segment replication] decouple the rateLimiter of segrep and recovery…
Browse files Browse the repository at this point in the history
… (12939)

add setting "segrep.max_bytes_per_sec"

Signed-off-by: maxliu <ly_chinese@163.com>
  • Loading branch information
Ferrari248 committed Mar 28, 2024
1 parent 8ad0dc0 commit 8b844fe
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER,
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.SEGREP_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ public class RecoverySettings {
Property.NodeScope
);

public static final Setting<ByteSizeValue> SEGREP_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting(
"segrep.max_bytes_per_sec",
new ByteSizeValue(0),
Property.Dynamic,
Property.NodeScope
);

/**
* Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node.
*/
Expand Down Expand Up @@ -170,10 +177,12 @@ public class RecoverySettings {
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);

private volatile ByteSizeValue maxBytesPerSec;
private volatile ByteSizeValue segrepMaxBytesPerSec;
private volatile int maxConcurrentFileChunks;
private volatile int maxConcurrentOperations;
private volatile int maxConcurrentRemoteStoreStreams;
private volatile SimpleRateLimiter rateLimiter;
private volatile SimpleRateLimiter segrepRateLimiter;
private volatile TimeValue retryDelayStateSync;
private volatile TimeValue retryDelayNetwork;
private volatile TimeValue activityTimeout;
Expand Down Expand Up @@ -204,11 +213,18 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
} else {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
}
this.segrepMaxBytesPerSec = SEGREP_MAX_BYTES_PER_SEC_SETTING.get(settings);
if (segrepMaxBytesPerSec.getBytes() <= 0) {
segrepRateLimiter = null;
} else {
segrepRateLimiter = new SimpleRateLimiter(segrepMaxBytesPerSec.getMbFrac());
}

logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(SEGREP_MAX_BYTES_PER_SEC_SETTING, this::setSegrepMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations);
clusterSettings.addSettingsUpdateConsumer(
Expand All @@ -231,6 +247,10 @@ public RateLimiter rateLimiter() {
return rateLimiter;
}

public RateLimiter segrepRateLimiter() {
return segrepRateLimiter;
}

public TimeValue retryDelayNetwork() {
return retryDelayNetwork;
}
Expand Down Expand Up @@ -305,6 +325,17 @@ private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
}
}

private void setSegrepMaxBytesPerSec(ByteSizeValue segrepMaxBytesPerSec) {
this.segrepMaxBytesPerSec = segrepMaxBytesPerSec;
if (segrepMaxBytesPerSec.getBytes() <= 0) {
segrepRateLimiter = null;
} else if (segrepRateLimiter != null) {
segrepRateLimiter.setMBPerSec(segrepMaxBytesPerSec.getMbFrac());
} else {
segrepRateLimiter = new SimpleRateLimiter(segrepMaxBytesPerSec.getMbFrac());
}
}

public int getMaxConcurrentFileChunks() {
return maxConcurrentFileChunks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ public void writeFileChunk(
// Pause using the rate limiter, if desired, to throttle the recovery
final long throttleTimeInNanos;
// always fetch the ratelimiter - it might be updated in real-time on the recovery settings
final RateLimiter rl = recoverySettings.rateLimiter();
final RateLimiter rl;
if (SegmentReplicationTargetService.Actions.FILE_CHUNK.equals(action)) {
rl = recoverySettings.segrepRateLimiter();
} else {
rl = recoverySettings.rateLimiter();
}
if (rl != null) {
long bytes = bytesSinceLastPause.addAndGet(content.length());
if (bytes > rl.getMinPauseCheckBytes()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
try (ReplicationRef<SegmentReplicationTarget> ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) {
final SegmentReplicationTarget target = ref.get();
final ActionListener<Void> listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.segrepRateLimiter(), listener);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public void testZeroBytesPerSecondIsNoRateLimit() {
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
);
assertEquals(null, recoverySettings.rateLimiter());
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.SEGREP_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
);
assertEquals(null, recoverySettings.segrepRateLimiter());
}

public void testRetryDelayStateSync() {
Expand Down

0 comments on commit 8b844fe

Please sign in to comment.