Skip to content

Commit

Permalink
Fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 1, 2023
1 parent 08eebc5 commit 2a71c53
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,18 @@ protected boolean accept(ScrollableHitSource.Hit doc) {

protected BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs) {
BulkRequest bulkRequest = new BulkRequest();
for (ScrollableHitSource.Hit doc : docs) {
if (accept(doc)) {
RequestWrapper<?> request = scriptApplier.apply(copyMetadata(buildRequest(doc), doc), doc);
if (request != null) {
bulkRequest.add(request.self());
try {
for (ScrollableHitSource.Hit doc : docs) {
if (accept(doc)) {
RequestWrapper<?> request = scriptApplier.apply(copyMetadata(buildRequest(doc), doc), doc);
if (request != null) {
bulkRequest.add(request.self());
}
}
}
} catch (Exception e) {
bulkRequest.close();
throw e;
}
return bulkRequest;
}
Expand Down Expand Up @@ -412,18 +417,18 @@ void prepareBulkRequest(long thisBatchStartTimeNS, ScrollConsumableHitsResponse
/*
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
*/
notifyDone(thisBatchStartTimeNS, asyncResponse, request);
notifyDone(thisBatchStartTimeNS, asyncResponse, 0);
return;
}
request.timeout(mainRequest.getTimeout());
request.waitForActiveShards(mainRequest.getWaitForActiveShards());
sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request), request::close);
sendBulkRequest(request, () -> notifyDone(thisBatchStartTimeNS, asyncResponse, request.requests().size()), request::close);
}

/**
* Send a bulk request, handling retries.
*/
void sendBulkRequest(BulkRequest request, Runnable onSuccess, Runnable onFailure) {
void sendBulkRequest(BulkRequest request, Runnable onSuccess, Runnable onCompletion) {
final int requestSize = request.requests().size();
if (logger.isDebugEnabled()) {
logger.debug(
Expand All @@ -435,19 +440,20 @@ void sendBulkRequest(BulkRequest request, Runnable onSuccess, Runnable onFailure
}
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
onCompletion.run();
finishHim(null);
return;
}
bulkRetry.withBackoff(bulkClient::bulk, request, new ActionListener<>() {
@Override
public void onResponse(BulkResponse response) {
logger.debug("[{}]: completed [{}] entry bulk request", task.getId(), requestSize);
onBulkResponse(response, onSuccess);
onBulkResponse(response, onSuccess, onCompletion);
}

@Override
public void onFailure(Exception e) {
onFailure.run();
onCompletion.run();
finishHim(e);
}
});
Expand All @@ -456,7 +462,7 @@ public void onFailure(Exception e) {
/**
* Processes bulk responses, accounting for failures.
*/
void onBulkResponse(BulkResponse response, Runnable onSuccess) {
void onBulkResponse(BulkResponse response, Runnable onSuccess, Runnable onCompletion) {
try {
List<Failure> failures = new ArrayList<>();
Set<String> destinationIndicesThisBatch = new HashSet<>();
Expand Down Expand Up @@ -513,12 +519,12 @@ void onBulkResponse(BulkResponse response, Runnable onSuccess) {
onSuccess.run();
} catch (Exception t) {
finishHim(t);
} finally {
onCompletion.run();
}
}

void notifyDone(long thisBatchStartTimeNS, ScrollConsumableHitsResponse asyncResponse, BulkRequest request) {
int batchSize = request.requests().size();
request.close();
void notifyDone(long thisBatchStartTimeNS, ScrollConsumableHitsResponse asyncResponse, int batchSize) {
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
finishHim(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public void testBulkResponseSetsLotsOfStatus() throws Exception {
final IndexResponse response = new IndexResponse(shardId, "id" + i, seqNo, primaryTerm, randomInt(), createdResponse);
responses[i] = BulkItemResponse.success(i, opType, response);
}
assertExactlyOnce(onSuccess -> action.onBulkResponse(new BulkResponse(responses, 0), onSuccess));
assertExactlyOnce(onSuccess -> action.onBulkResponse(new BulkResponse(responses, 0), onSuccess, () -> {}));
assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts());
assertEquals(updated, testTask.getStatus().getUpdated());
assertEquals(created, testTask.getStatus().getCreated());
Expand All @@ -350,7 +350,7 @@ public void testHandlesBulkWithNoScroll() {

// when receiving bulk response
var responses = randomArray(0, maxDocs, BulkItemResponse[]::new, AsyncBulkByScrollActionTests::createBulkResponse);
new DummyAsyncBulkByScrollAction().onBulkResponse(new BulkResponse(responses, 0), () -> fail("should not be called"));
new DummyAsyncBulkByScrollAction().onBulkResponse(new BulkResponse(responses, 0), () -> fail("should not be called"), () -> {});

// then should refresh and finish
assertThat(listener.isDone(), equalTo(true));
Expand All @@ -366,7 +366,7 @@ public void testHandlesBulkWhenMaxDocsIsReached() {

// when receiving bulk response with max docs
var responses = randomArray(size, size, BulkItemResponse[]::new, AsyncBulkByScrollActionTests::createBulkResponse);
new DummyAsyncBulkByScrollAction().onBulkResponse(new BulkResponse(responses, 0), () -> fail("should not be called"));
new DummyAsyncBulkByScrollAction().onBulkResponse(new BulkResponse(responses, 0), () -> fail("should not be called"), () -> {});

// then should refresh and finish
assertThat(listener.isDone(), equalTo(true));
Expand Down Expand Up @@ -466,7 +466,7 @@ public void testBulkFailuresAbortRequest() throws Exception {
new BulkItemResponse[] { BulkItemResponse.failure(0, DocWriteRequest.OpType.CREATE, failure) },
randomLong()
);
action.onBulkResponse(bulkResponse, Assert::fail);
action.onBulkResponse(bulkResponse, Assert::fail, () -> {});
BulkByScrollResponse response = listener.get();
assertThat(response.getBulkFailures(), contains(failure));
assertThat(response.getSearchFailures(), empty());
Expand Down Expand Up @@ -704,18 +704,27 @@ public void testCancelBeforeScrollResponse() throws Exception {
}

public void testCancelBeforeSendBulkRequest() throws Exception {
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.sendBulkRequest(new BulkRequest(), Assert::fail, () -> {}));
BulkRequest bulkRequest = new BulkRequest();
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.sendBulkRequest(bulkRequest, Assert::fail, bulkRequest::close));
}

public void testCancelBeforeOnBulkResponse() throws Exception {
cancelTaskCase(
(DummyAsyncBulkByScrollAction action) -> action.onBulkResponse(new BulkResponse(new BulkItemResponse[0], 0), Assert::fail)
(DummyAsyncBulkByScrollAction action) -> action.onBulkResponse(
new BulkResponse(new BulkItemResponse[0], 0),
Assert::fail,
() -> {}
)
);
}

public void testCancelBeforeStartNextScroll() throws Exception {
long now = System.nanoTime();
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> action.notifyDone(now, null, new BulkRequest()));
BulkRequest bulkRequest = new BulkRequest();
cancelTaskCase((DummyAsyncBulkByScrollAction action) -> {
action.notifyDone(now, null, bulkRequest.requests().size());
bulkRequest.close();
});
}

public void testCancelBeforeRefreshAndFinish() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -263,6 +264,12 @@ private TestAuditor createTestAuditorWithoutTemplate(CountDownLatch latch) {

return null;
}).when(client).execute(eq(PutComposableIndexTemplateAction.INSTANCE), any(), any());
ArgumentCaptor<ActionListener<BulkResponse>> listenerCaptor = ArgumentCaptor.forClass(ActionListener.class);
doAnswer(invovation -> {
// We do this so that the listener is triggered in some way so that the AbstractAuditor releases the BulkRequest
listenerCaptor.getValue().onFailure(new RuntimeException());
return null;
}).when(client).execute(eq(BulkAction.INSTANCE), any(), listenerCaptor.capture());

IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
AdminClient adminClient = mock(AdminClient.class);
Expand Down

0 comments on commit 2a71c53

Please sign in to comment.