From 9ff9b3308de8ad60e006c2728685bb3d11b4d4d2 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 6 Dec 2023 16:00:05 -0600 Subject: [PATCH] fixing RollupIndexerStateTests --- .../core/indexing/AsyncTwoPhaseIndexer.java | 75 ++++++++++--------- .../persistence/LimitAwareBulkIndexer.java | 10 +-- 2 files changed, 45 insertions(+), 40 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index 74bf43c123a98..1e25a6436bd04 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -522,44 +522,49 @@ private void onSearchResponse(SearchResponse searchResponse) { } final BulkRequest bulkRequest = new BulkRequest(); - iterationResult.getToIndex().forEach(bulkRequest::add); - stats.markEndProcessing(); - - // an iteration result might return an empty set of documents to be indexed - if (bulkRequest.numberOfActions() > 0) { - stats.markStartIndexing(); - doNextBulk(bulkRequest, ActionListener.releaseAfter(ActionListener.wrap(bulkResponse -> { - // TODO we should check items in the response and move after accordingly to - // resume the failing buckets ? - if (bulkResponse.hasFailures()) { - logger.warn("Error while attempting to bulk index documents: {}", bulkResponse.buildFailureMessage()); - } - stats.incrementNumOutputDocuments(bulkResponse.getItems().length); - // There is no reason to do a `checkState` here and prevent the indexer from continuing - // As we have already indexed the documents, updated the stats, etc. - // We do an another `checkState` in `onBulkResponse` which will stop the indexer if necessary - // And, we will still be at our new position due to setting it here. - JobPosition newPosition = iterationResult.getPosition(); - position.set(newPosition); - - onBulkResponse(bulkResponse, newPosition); - }, this::finishWithIndexingFailure), bulkRequest)); - } else { - // no documents need to be indexed, continue with search - try { - JobPosition newPosition = iterationResult.getPosition(); - position.set(newPosition); + try { + iterationResult.getToIndex().forEach(bulkRequest::add); + stats.markEndProcessing(); - if (triggerSaveState()) { - doSaveState(IndexerState.INDEXING, newPosition, this::nextSearch); - } else { - nextSearch(); + // an iteration result might return an empty set of documents to be indexed + if (bulkRequest.numberOfActions() > 0) { + stats.markStartIndexing(); + doNextBulk(bulkRequest, ActionListener.releaseAfter(ActionListener.wrap(bulkResponse -> { + // TODO we should check items in the response and move after accordingly to + // resume the failing buckets ? + if (bulkResponse.hasFailures()) { + logger.warn("Error while attempting to bulk index documents: {}", bulkResponse.buildFailureMessage()); + } + stats.incrementNumOutputDocuments(bulkResponse.getItems().length); + // There is no reason to do a `checkState` here and prevent the indexer from continuing + // As we have already indexed the documents, updated the stats, etc. + // We do an another `checkState` in `onBulkResponse` which will stop the indexer if necessary + // And, we will still be at our new position due to setting it here. + JobPosition newPosition = iterationResult.getPosition(); + position.set(newPosition); + + onBulkResponse(bulkResponse, newPosition); + }, this::finishWithIndexingFailure), bulkRequest)); + } else { + // no documents need to be indexed, continue with search + try { + JobPosition newPosition = iterationResult.getPosition(); + position.set(newPosition); + + if (triggerSaveState()) { + doSaveState(IndexerState.INDEXING, newPosition, this::nextSearch); + } else { + nextSearch(); + } + } catch (Exception e) { + finishWithFailure(e); + } finally { + bulkRequest.close(); } - } catch (Exception e) { - finishWithFailure(e); - } finally { - bulkRequest.close(); } + } catch (Exception e) { + bulkRequest.close(); + throw e; } } catch (Exception e) { finishWithSearchFailure(e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/LimitAwareBulkIndexer.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/LimitAwareBulkIndexer.java index f06b300d12e32..83d48425d01a4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/LimitAwareBulkIndexer.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/LimitAwareBulkIndexer.java @@ -70,10 +70,10 @@ private void execute() { @Override public void close() { - try { - execute(); - } finally { - currentBulkRequest.close(); - } + try { + execute(); + } finally { + currentBulkRequest.close(); } + } }