Skip to content

Commit

Permalink
fixing RollupIndexerStateTests
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Dec 6, 2023
1 parent 3de223b commit 9ff9b33
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -522,44 +522,49 @@ private void onSearchResponse(SearchResponse searchResponse) {
}

final BulkRequest bulkRequest = new BulkRequest();
iterationResult.getToIndex().forEach(bulkRequest::add);
stats.markEndProcessing();

// an iteration result might return an empty set of documents to be indexed
if (bulkRequest.numberOfActions() > 0) {
stats.markStartIndexing();
doNextBulk(bulkRequest, ActionListener.releaseAfter(ActionListener.wrap(bulkResponse -> {
// TODO we should check items in the response and move after accordingly to
// resume the failing buckets ?
if (bulkResponse.hasFailures()) {
logger.warn("Error while attempting to bulk index documents: {}", bulkResponse.buildFailureMessage());
}
stats.incrementNumOutputDocuments(bulkResponse.getItems().length);
// There is no reason to do a `checkState` here and prevent the indexer from continuing
// As we have already indexed the documents, updated the stats, etc.
// We do an another `checkState` in `onBulkResponse` which will stop the indexer if necessary
// And, we will still be at our new position due to setting it here.
JobPosition newPosition = iterationResult.getPosition();
position.set(newPosition);

onBulkResponse(bulkResponse, newPosition);
}, this::finishWithIndexingFailure), bulkRequest));
} else {
// no documents need to be indexed, continue with search
try {
JobPosition newPosition = iterationResult.getPosition();
position.set(newPosition);
try {
iterationResult.getToIndex().forEach(bulkRequest::add);
stats.markEndProcessing();

if (triggerSaveState()) {
doSaveState(IndexerState.INDEXING, newPosition, this::nextSearch);
} else {
nextSearch();
// an iteration result might return an empty set of documents to be indexed
if (bulkRequest.numberOfActions() > 0) {
stats.markStartIndexing();
doNextBulk(bulkRequest, ActionListener.releaseAfter(ActionListener.wrap(bulkResponse -> {
// TODO we should check items in the response and move after accordingly to
// resume the failing buckets ?
if (bulkResponse.hasFailures()) {
logger.warn("Error while attempting to bulk index documents: {}", bulkResponse.buildFailureMessage());
}
stats.incrementNumOutputDocuments(bulkResponse.getItems().length);
// There is no reason to do a `checkState` here and prevent the indexer from continuing
// As we have already indexed the documents, updated the stats, etc.
// We do an another `checkState` in `onBulkResponse` which will stop the indexer if necessary
// And, we will still be at our new position due to setting it here.
JobPosition newPosition = iterationResult.getPosition();
position.set(newPosition);

onBulkResponse(bulkResponse, newPosition);
}, this::finishWithIndexingFailure), bulkRequest));
} else {
// no documents need to be indexed, continue with search
try {
JobPosition newPosition = iterationResult.getPosition();
position.set(newPosition);

if (triggerSaveState()) {
doSaveState(IndexerState.INDEXING, newPosition, this::nextSearch);
} else {
nextSearch();
}
} catch (Exception e) {
finishWithFailure(e);
} finally {
bulkRequest.close();
}
} catch (Exception e) {
finishWithFailure(e);
} finally {
bulkRequest.close();
}
} catch (Exception e) {
bulkRequest.close();
throw e;
}
} catch (Exception e) {
finishWithSearchFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ private void execute() {

@Override
public void close() {
try {
execute();
} finally {
currentBulkRequest.close();
}
try {
execute();
} finally {
currentBulkRequest.close();
}
}
}

0 comments on commit 9ff9b33

Please sign in to comment.