Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for stuck update action in a bulk with retry_on_conflict property #11153

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix passing wrong parameter when calling newConfigurationException() in DotExpanderProcessor ([#10737](https://github.com/opensearch-project/OpenSearch/pull/10737))
- Fix SuggestSearch.testSkipDuplicates by forceing refresh when indexing its test documents ([#11068](https://github.com/opensearch-project/OpenSearch/pull/11068))
- Fix per request latency last phase not tracked ([#10934](https://github.com/opensearch-project/OpenSearch/pull/10934))
- Fix for stuck update action in a bulk with `retry_on_conflict` property ([#11152](https://github.com/opensearch-project/OpenSearch/issues/11152))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public void resetForExecutionForRetry() {
currentItemState = ItemProcessingState.INITIAL;
requestToExecute = null;
executionResult = null;
retryCounter++;
raghuvanshraj marked this conversation as resolved.
Show resolved Hide resolved
assertInvariants(ItemProcessingState.INITIAL);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,15 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
Expand Down Expand Up @@ -947,6 +951,82 @@ public void testRetries() throws Exception {
latch.await();
}

public void testUpdateWithRetryOnConflict() throws IOException, InterruptedException {
IndexSettings indexSettings = new IndexSettings(indexMetadata(), Settings.EMPTY);

int nItems = randomIntBetween(2, 5);
List<BulkItemRequest> items = new ArrayList<>(nItems);
for (int i = 0; i < nItems; i++) {
int retryOnConflictCount = randomIntBetween(0, 3);
logger.debug("Setting retryCount for item {}: {}", i, retryOnConflictCount);
UpdateRequest updateRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value")
.retryOnConflict(retryOnConflictCount);
items.add(new BulkItemRequest(i, updateRequest));
}

IndexRequest updateResponse = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");

Exception err = new VersionConflictEngineException(shardId, "id", "I'm conflicted <(;_;)>");
Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0);

IndexShard shard = mock(IndexShard.class);
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenAnswer(
ir -> conflictedResult
);
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);
when(shard.mapperService()).thenReturn(mock(MapperService.class));

UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
new UpdateHelper.Result(
updateResponse,
randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED,
Collections.singletonMap("field", "value"),
Requests.INDEX_CONTENT_TYPE
)
);

BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items.toArray(BulkItemRequest[]::new));

final CountDownLatch latch = new CountDownLatch(1);
Runnable runnable = () -> TransportShardBulkAction.performOnPrimary(
bulkShardRequest,
shard,
updateHelper,
threadPool::absoluteTimeInMillis,
new NoopMappingUpdatePerformer(),
listener -> listener.onResponse(null),
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
assertEquals(nItems, result.replicaRequest().items().length);
for (BulkItemRequest item : result.replicaRequest().items()) {
assertEquals(VersionConflictEngineException.class, item.getPrimaryResponse().getFailure().getCause().getClass());
}
}), latch),
threadPool,
Names.WRITE
);

// execute the runnable on a separate thread so that the infinite loop can be detected
new Thread(runnable).start();

// timeout the request in 10 seconds if there is an infinite loop
assertTrue(latch.await(10, TimeUnit.SECONDS));

items.forEach(item -> {
assertEquals(item.getPrimaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class);

// this assertion is based on the assumption that all bulk item requests are updates and are hence calling
// UpdateRequest::prepareRequest
UpdateRequest updateRequest = (UpdateRequest) item.request();
verify(updateHelper, times(updateRequest.retryOnConflict() + 1)).prepare(
eq(updateRequest),
any(IndexShard.class),
any(LongSupplier.class)
);
});
}

public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
TestThreadPool rejectingThreadPool = new TestThreadPool(
"TransportShardBulkActionTests#testForceExecutionOnRejectionAfterMappingUpdate",
Expand Down
Loading