From b3fa3246654ed4a30bf90f3bc50d448ace10a314 Mon Sep 17 00:00:00 2001 From: Raghuvansh Raj Date: Fri, 10 Nov 2023 07:00:31 +0530 Subject: [PATCH 1/3] Bugfix for update staying stuck when sent as part of bulk with retry_on_conflict specified Signed-off-by: Raghuvansh Raj --- CHANGELOG.md | 1 + .../bulk/BulkPrimaryExecutionContext.java | 1 + .../bulk/TransportShardBulkActionTests.java | 63 +++++++++++++++++++ 3 files changed, 65 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 95baa2e88c04a..7c285741985f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625)) - [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853)) - Add back half_float BKD based sort query optimization ([#11024](https://github.com/opensearch-project/OpenSearch/pull/11024)) +- Bugfix for update staying stuck when sent as part of bulk with `retry_on_conflict` specified ([#11152](https://github.com/opensearch-project/OpenSearch/issues/11152)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java index 896456089ee3e..4e770f5851bc6 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkPrimaryExecutionContext.java @@ -232,6 +232,7 @@ public void resetForExecutionForRetry() { currentItemState = ItemProcessingState.INITIAL; requestToExecute = null; executionResult = null; + retryCounter++; assertInvariants(ItemProcessingState.INITIAL); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index b325cfa197933..99c311c11e33f 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -101,6 +101,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.CoreMatchers.equalTo; @@ -947,6 +948,68 @@ public void testRetries() throws Exception { latch.await(); } + public void testUpdateWithRetryOnConflict() throws IOException, InterruptedException { + IndexSettings indexSettings = new IndexSettings(indexMetadata(), Settings.EMPTY); + UpdateRequest writeRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); + writeRequest.retryOnConflict(3); + BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); + + IndexRequest updateResponse = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"); + + Exception err = new VersionConflictEngineException(shardId, "id", "I'm conflicted <(;_;)>"); + Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0); + + IndexShard shard = mock(IndexShard.class); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenAnswer( + ir -> conflictedResult + ); + when(shard.indexSettings()).thenReturn(indexSettings); + when(shard.shardId()).thenReturn(shardId); + when(shard.mapperService()).thenReturn(mock(MapperService.class)); + + UpdateHelper updateHelper = mock(UpdateHelper.class); + when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + new UpdateHelper.Result( + updateResponse, + randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, + Collections.singletonMap("field", "value"), + Requests.INDEX_CONTENT_TYPE + ) + ); + + BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; + BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + + final CountDownLatch latch = new CountDownLatch(1); + Runnable runnable = () -> TransportShardBulkAction.performOnPrimary( + bulkShardRequest, + shard, + updateHelper, + threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), + listener -> listener.onResponse(null), + new LatchedActionListener<>( + ActionTestUtils.assertNoFailureListener( + result -> assertEquals( + VersionConflictEngineException.class, + result.replicaRequest().items()[0].getPrimaryResponse().getFailure().getCause().getClass() + ) + ), + latch + ), + threadPool, + Names.WRITE + ); + + // execute the runnable on a separate thread so that the infinite loop can be detected + Thread thread = new Thread(runnable); + thread.start(); + + // timeout the request in 10 seconds if there is an infinite loop + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(items[0].getPrimaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class); + } + public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { TestThreadPool rejectingThreadPool = new TestThreadPool( "TransportShardBulkActionTests#testForceExecutionOnRejectionAfterMappingUpdate", From eb5def57dc77be01c63225219d5c034e5aab5d69 Mon Sep 17 00:00:00 2001 From: Raghuvansh Raj Date: Fri, 10 Nov 2023 08:47:49 +0530 Subject: [PATCH 2/3] Addressed comments on CHANGELOG Signed-off-by: Raghuvansh Raj --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c285741985f6..5e027f3a9c9df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -156,6 +156,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix passing wrong parameter when calling newConfigurationException() in DotExpanderProcessor ([#10737](https://github.com/opensearch-project/OpenSearch/pull/10737)) - Fix SuggestSearch.testSkipDuplicates by forceing refresh when indexing its test documents ([#11068](https://github.com/opensearch-project/OpenSearch/pull/11068)) - Fix per request latency last phase not tracked ([#10934](https://github.com/opensearch-project/OpenSearch/pull/10934)) +- Fix for stuck update action in a bulk with `retry_on_conflict` property ([#11152](https://github.com/opensearch-project/OpenSearch/issues/11152)) ### Security From 635618593d326a1e34ce05038c39fd91f4273f87 Mon Sep 17 00:00:00 2001 From: Raghuvansh Raj Date: Sun, 12 Nov 2023 00:56:08 +0530 Subject: [PATCH 3/3] Added multiple items in UT with different retry counts and item level retry assertion Signed-off-by: Raghuvansh Raj --- CHANGELOG.md | 1 - .../bulk/TransportShardBulkActionTests.java | 51 ++++++++++++------- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e027f3a9c9df..7bbfeba6021d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625)) - [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853)) - Add back half_float BKD based sort query optimization ([#11024](https://github.com/opensearch-project/OpenSearch/pull/11024)) -- Bugfix for update staying stuck when sent as part of bulk with `retry_on_conflict` specified ([#11152](https://github.com/opensearch-project/OpenSearch/issues/11152)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index 99c311c11e33f..65b555649b2d0 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -97,12 +97,15 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.LongSupplier; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; @@ -950,9 +953,16 @@ public void testRetries() throws Exception { public void testUpdateWithRetryOnConflict() throws IOException, InterruptedException { IndexSettings indexSettings = new IndexSettings(indexMetadata(), Settings.EMPTY); - UpdateRequest writeRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); - writeRequest.retryOnConflict(3); - BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); + + int nItems = randomIntBetween(2, 5); + List items = new ArrayList<>(nItems); + for (int i = 0; i < nItems; i++) { + int retryOnConflictCount = randomIntBetween(0, 3); + logger.debug("Setting retryCount for item {}: {}", i, retryOnConflictCount); + UpdateRequest updateRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value") + .retryOnConflict(retryOnConflictCount); + items.add(new BulkItemRequest(i, updateRequest)); + } IndexRequest updateResponse = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"); @@ -977,8 +987,7 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep ) ); - BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; - BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items.toArray(BulkItemRequest[]::new)); final CountDownLatch latch = new CountDownLatch(1); Runnable runnable = () -> TransportShardBulkAction.performOnPrimary( @@ -988,26 +997,34 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), listener -> listener.onResponse(null), - new LatchedActionListener<>( - ActionTestUtils.assertNoFailureListener( - result -> assertEquals( - VersionConflictEngineException.class, - result.replicaRequest().items()[0].getPrimaryResponse().getFailure().getCause().getClass() - ) - ), - latch - ), + new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> { + assertEquals(nItems, result.replicaRequest().items().length); + for (BulkItemRequest item : result.replicaRequest().items()) { + assertEquals(VersionConflictEngineException.class, item.getPrimaryResponse().getFailure().getCause().getClass()); + } + }), latch), threadPool, Names.WRITE ); // execute the runnable on a separate thread so that the infinite loop can be detected - Thread thread = new Thread(runnable); - thread.start(); + new Thread(runnable).start(); // timeout the request in 10 seconds if there is an infinite loop assertTrue(latch.await(10, TimeUnit.SECONDS)); - assertEquals(items[0].getPrimaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class); + + items.forEach(item -> { + assertEquals(item.getPrimaryResponse().getFailure().getCause().getClass(), VersionConflictEngineException.class); + + // this assertion is based on the assumption that all bulk item requests are updates and are hence calling + // UpdateRequest::prepareRequest + UpdateRequest updateRequest = (UpdateRequest) item.request(); + verify(updateHelper, times(updateRequest.retryOnConflict() + 1)).prepare( + eq(updateRequest), + any(IndexShard.class), + any(LongSupplier.class) + ); + }); } public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {