Skip to content

Commit

Permalink
Merge pull request #7 from Bukhtawar/ashish-8345
Browse files Browse the repository at this point in the history
Fix up spotless
  • Loading branch information
Bukhtawar authored Jul 11, 2023
2 parents 1508ada + 53f61f4 commit d655d99
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class GatedDelegateRefreshListener implements ReferenceManager.RefreshLis
*/
GatedDelegateRefreshListener(ReferenceManager.RefreshListener delegateListener, @Nullable MeanMetric refreshListenerMetrics) {
this.delegateListener = delegateListener;
//TODO instrument metrics for listeners
// TODO instrument metrics for listeners
this.refreshListenerMetrics = refreshListenerMetrics;
}

Expand Down Expand Up @@ -79,7 +79,7 @@ private void handleDelegate(Runnable delegate) {
// this should never happen, if it does something is deeply wrong
throw new TimeoutException("failed to obtain permit but operations are not delayed");
}
} catch(InterruptedException | TimeoutException e){
} catch (InterruptedException | TimeoutException e) {
throw new RuntimeException("Failed to handle delegate due to ", e);
}
}
Expand Down
36 changes: 20 additions & 16 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -803,13 +803,13 @@ public void relocated(
final Runnable performSegRep
) throws IllegalIndexShardStateException, IllegalStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
//Force refreshes pending refresh listeners
// Force refreshes pending refresh listeners
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
//Prevents new refresh listeners to be registered while all permits are acquired
// Prevents new refresh listeners to be registered while all permits are acquired
forceRefreshes.close();
//Ensures all in-flight remote store operations drain, before we hand-off.
internalRefreshListeners.stream().filter(i -> i instanceof Closeable).map(i -> (Closeable)i).close();
// Ensures all in-flight remote store operations drain, before we hand-off.
internalRefreshListeners.stream().filter(i -> i instanceof Closeable).map(i -> (Closeable) i).close();

boolean syncTranslog = isRemoteTranslogEnabled() && Durability.ASYNC == indexSettings.getTranslogDurability();
// Since all the index permits are acquired at this point, the translog buffer will not change.
Expand All @@ -821,9 +821,9 @@ public void relocated(
// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
: "in-flight operations in progress while moving shard state to relocated";
//Run a round of segrep while we are waiting for the permits. We need to evaluate if this needs to be put inside
//the permit to synchronise segments. However since indexing operations are stalled, we need to justify the cost
//of blocking segrep round, which for remote store enabled nodes will require operations to drain on the remote store.
// Run a round of segrep while we are waiting for the permits. We need to evaluate if this needs to be put inside
// the permit to synchronise segments. However since indexing operations are stalled, we need to justify the cost
// of blocking segrep round, which for remote store enabled nodes will require operations to drain on the remote store.
performSegRep.run();

/*
Expand Down Expand Up @@ -3670,19 +3670,23 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro

internalRefreshListeners.add(new RefreshMetricUpdater(refreshMetric));
if (isRemoteStoreEnabled()) {
internalRefreshListeners.add(new GatedDelegateRefreshListener(
new RemoteStoreRefreshListener(
this,
// Add the checkpoint publisher if the Segment Replciation via remote store is enabled.
indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY,
remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId())
), refreshListenerMetric)
internalRefreshListeners.add(
new GatedDelegateRefreshListener(
new RemoteStoreRefreshListener(
this,
// Add the checkpoint publisher if the Segment Replciation via remote store is enabled.
indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY,
remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId())
),
refreshListenerMetric
)
);
}

if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListeners.add(new GatedDelegateRefreshListener(
new CheckpointRefreshListener(this, this.checkpointPublisher), null));
internalRefreshListeners.add(
new GatedDelegateRefreshListener(new CheckpointRefreshListener(this, this.checkpointPublisher), null)
);
}
/**
* With segment replication enabled for primary relocation, recover replica shard initially as read only and
Expand Down

0 comments on commit d655d99

Please sign in to comment.