Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix up spotless #7

Merged
merged 1 commit into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class GatedDelegateRefreshListener implements ReferenceManager.RefreshLis
*/
GatedDelegateRefreshListener(ReferenceManager.RefreshListener delegateListener, @Nullable MeanMetric refreshListenerMetrics) {
this.delegateListener = delegateListener;
//TODO instrument metrics for listeners
// TODO instrument metrics for listeners
this.refreshListenerMetrics = refreshListenerMetrics;
}

Expand Down Expand Up @@ -79,7 +79,7 @@ private void handleDelegate(Runnable delegate) {
// this should never happen, if it does something is deeply wrong
throw new TimeoutException("failed to obtain permit but operations are not delayed");
}
} catch(InterruptedException | TimeoutException e){
} catch (InterruptedException | TimeoutException e) {
throw new RuntimeException("Failed to handle delegate due to ", e);
}
}
Expand Down
36 changes: 20 additions & 16 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -803,13 +803,13 @@ public void relocated(
final Runnable performSegRep
) throws IllegalIndexShardStateException, IllegalStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
//Force refreshes pending refresh listeners
// Force refreshes pending refresh listeners
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
//Prevents new refresh listeners to be registered while all permits are acquired
// Prevents new refresh listeners to be registered while all permits are acquired
forceRefreshes.close();
//Ensures all in-flight remote store operations drain, before we hand-off.
internalRefreshListeners.stream().filter(i -> i instanceof Closeable).map(i -> (Closeable)i).close();
// Ensures all in-flight remote store operations drain, before we hand-off.
internalRefreshListeners.stream().filter(i -> i instanceof Closeable).map(i -> (Closeable) i).close();

boolean syncTranslog = isRemoteTranslogEnabled() && Durability.ASYNC == indexSettings.getTranslogDurability();
// Since all the index permits are acquired at this point, the translog buffer will not change.
Expand All @@ -821,9 +821,9 @@ public void relocated(
// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
: "in-flight operations in progress while moving shard state to relocated";
//Run a round of segrep while we are waiting for the permits. We need to evaluate if this needs to be put inside
//the permit to synchronise segments. However since indexing operations are stalled, we need to justify the cost
//of blocking segrep round, which for remote store enabled nodes will require operations to drain on the remote store.
// Run a round of segrep while we are waiting for the permits. We need to evaluate if this needs to be put inside
// the permit to synchronise segments. However since indexing operations are stalled, we need to justify the cost
// of blocking segrep round, which for remote store enabled nodes will require operations to drain on the remote store.
performSegRep.run();

/*
Expand Down Expand Up @@ -3670,19 +3670,23 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro

internalRefreshListeners.add(new RefreshMetricUpdater(refreshMetric));
if (isRemoteStoreEnabled()) {
internalRefreshListeners.add(new GatedDelegateRefreshListener(
new RemoteStoreRefreshListener(
this,
// Add the checkpoint publisher if the Segment Replciation via remote store is enabled.
indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY,
remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId())
), refreshListenerMetric)
internalRefreshListeners.add(
new GatedDelegateRefreshListener(
new RemoteStoreRefreshListener(
this,
// Add the checkpoint publisher if the Segment Replciation via remote store is enabled.
indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY,
remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId())
),
refreshListenerMetric
)
);
}

if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListeners.add(new GatedDelegateRefreshListener(
new CheckpointRefreshListener(this, this.checkpointPublisher), null));
internalRefreshListeners.add(
new GatedDelegateRefreshListener(new CheckpointRefreshListener(this, this.checkpointPublisher), null)
);
}
/**
* With segment replication enabled for primary relocation, recover replica shard initially as read only and
Expand Down