Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[segment replication] decouple the rateLimiter of segrep and recovery #12959

Merged
merged 9 commits into from
Apr 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER,
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.SEGREP_MAX_BYTES_PER_SEC_SETTING,
Ferrari248 marked this conversation as resolved.
Show resolved Hide resolved
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ public class RecoverySettings {
Property.NodeScope
);

public static final Setting<ByteSizeValue> SEGREP_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting(
Ferrari248 marked this conversation as resolved.
Show resolved Hide resolved
"segrep.max_bytes_per_sec",
new ByteSizeValue(0),
Property.Dynamic,
Property.NodeScope
);

/**
* Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node.
*/
Expand Down Expand Up @@ -170,10 +177,12 @@ public class RecoverySettings {
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);

private volatile ByteSizeValue maxBytesPerSec;
private volatile ByteSizeValue segrepMaxBytesPerSec;
private volatile int maxConcurrentFileChunks;
private volatile int maxConcurrentOperations;
private volatile int maxConcurrentRemoteStoreStreams;
private volatile SimpleRateLimiter rateLimiter;
private volatile SimpleRateLimiter segrepRateLimiter;
private volatile TimeValue retryDelayStateSync;
private volatile TimeValue retryDelayNetwork;
private volatile TimeValue activityTimeout;
Expand Down Expand Up @@ -204,11 +213,18 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
} else {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
}
this.segrepMaxBytesPerSec = SEGREP_MAX_BYTES_PER_SEC_SETTING.get(settings);
if (segrepMaxBytesPerSec.getBytes() <= 0) {
segrepRateLimiter = null;
} else {
segrepRateLimiter = new SimpleRateLimiter(segrepMaxBytesPerSec.getMbFrac());
}

logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(SEGREP_MAX_BYTES_PER_SEC_SETTING, this::setSegrepMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations);
clusterSettings.addSettingsUpdateConsumer(
Expand All @@ -231,6 +247,10 @@ public RateLimiter rateLimiter() {
return rateLimiter;
}

public RateLimiter segrepRateLimiter() {
return segrepRateLimiter;
}

public TimeValue retryDelayNetwork() {
return retryDelayNetwork;
}
Expand Down Expand Up @@ -305,6 +325,17 @@ private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
}
}

private void setSegrepMaxBytesPerSec(ByteSizeValue segrepMaxBytesPerSec) {
Ferrari248 marked this conversation as resolved.
Show resolved Hide resolved
this.segrepMaxBytesPerSec = segrepMaxBytesPerSec;
if (segrepMaxBytesPerSec.getBytes() <= 0) {
segrepRateLimiter = null;
} else if (segrepRateLimiter != null) {
segrepRateLimiter.setMBPerSec(segrepMaxBytesPerSec.getMbFrac());
} else {
segrepRateLimiter = new SimpleRateLimiter(segrepMaxBytesPerSec.getMbFrac());
}
}

public int getMaxConcurrentFileChunks() {
return maxConcurrentFileChunks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ public void writeFileChunk(
// Pause using the rate limiter, if desired, to throttle the recovery
final long throttleTimeInNanos;
// always fetch the ratelimiter - it might be updated in real-time on the recovery settings
final RateLimiter rl = recoverySettings.rateLimiter();
final RateLimiter rl;
if (SegmentReplicationTargetService.Actions.FILE_CHUNK.equals(action)) {
rl = recoverySettings.segrepRateLimiter();
} else {
rl = recoverySettings.rateLimiter();
}
if (rl != null) {
long bytes = bytesSinceLastPause.addAndGet(content.length());
if (bytes > rl.getMinPauseCheckBytes()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
try (ReplicationRef<SegmentReplicationTarget> ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) {
final SegmentReplicationTarget target = ref.get();
final ActionListener<Void> listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.segrepRateLimiter(), listener);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public void testZeroBytesPerSecondIsNoRateLimit() {
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
);
assertEquals(null, recoverySettings.rateLimiter());
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.SEGREP_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
);
assertEquals(null, recoverySettings.segrepRateLimiter());
}

public void testRetryDelayStateSync() {
Expand Down
Loading