Skip to content

Commit

Permalink
OpenSearch Sink: Add log messages when there is no exception (#3532)
Browse files Browse the repository at this point in the history
Add log messages when there is exception

Signed-off-by: Kondaka <krishkdk@amazon.com>
  • Loading branch information
kkondaka authored Oct 31, 2023
1 parent c560e1f commit a30fdb5
Showing 1 changed file with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,13 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating
if (doRetry) {
if (retryCount % 5 == 0) {
LOG.warn("Bulk Operation Failed. Number of retries {}. Retrying... ", retryCount, e);
if (e == null) {
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if (Objects.nonNull(bulkItemResponse.error())) {
LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.error());
}
}
}
}
bulkRequestNumberOfRetries.increment();
return new BulkOperationRequestResponse(bulkRequestForRetry, bulkResponse);
Expand All @@ -248,7 +255,13 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating
}

private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequest, final BulkResponse bulkResponse, final Throwable failure) {
if (Objects.isNull(failure)) {
LOG.warn("Bulk Operation Failed.", failure);
if (failure == null) {
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if (Objects.nonNull(bulkItemResponse.error())) {
LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.error());
}
}
handleFailures(bulkRequest, bulkResponse.items());
} else {
handleFailures(bulkRequest, failure);
Expand Down

0 comments on commit a30fdb5

Please sign in to comment.