diff --git a/src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs b/src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs index 498b374e8c9..07e462ad2de 100644 --- a/src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs +++ b/src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs @@ -112,18 +112,18 @@ private async Task BulkAsync(IList buffer, long page, int b { _compositeCancelToken.ThrowIfCancellationRequested(); - var r = _partitionedBulkRequest; + var request = _partitionedBulkRequest; var response = await _client.BulkAsync(s => { - s.Index(r.Index).Type(r.Type); - if (r.BufferToBulk != null) r.BufferToBulk(s, buffer); + s.Index(request.Index).Type(request.Type); + if (request.BufferToBulk != null) request.BufferToBulk(s, buffer); else s.IndexMany(buffer); - if (!string.IsNullOrEmpty(r.Pipeline)) s.Pipeline(r.Pipeline); + if (!string.IsNullOrEmpty(request.Pipeline)) s.Pipeline(request.Pipeline); #pragma warning disable 618 - if (r.Refresh.HasValue) s.Refresh(r.Refresh.Value); + if (request.Refresh.HasValue) s.Refresh(request.Refresh.Value); #pragma warning restore 618 - if (r.Routing != null) s.Routing(r.Routing); - if (r.WaitForActiveShards.HasValue) s.WaitForActiveShards(r.WaitForActiveShards.ToString()); + if (request.Routing != null) s.Routing(request.Routing); + if (request.WaitForActiveShards.HasValue) s.WaitForActiveShards(request.WaitForActiveShards.ToString()); return s; }, _compositeCancelToken) @@ -134,29 +134,38 @@ private async Task BulkAsync(IList buffer, long page, int b if (!response.ApiCall.Success) return await HandleBulkRequest(buffer, page, backOffRetries, response); - var documentsWithResponse = response.Items.Zip(buffer, Tuple.Create).ToList(); + var successfulDocuments = new List>(); + var retryableDocuments = new List(); + var droppedDocuments = new List>(); - HandleDroppedDocuments(documentsWithResponse, response); + foreach (var documentWithResponse in response.Items.Zip(buffer, Tuple.Create)) + { + if (documentWithResponse.Item1.IsValid) + successfulDocuments.Add(documentWithResponse); + else + { + if (_retryPredicate(documentWithResponse.Item1, documentWithResponse.Item2)) + retryableDocuments.Add(documentWithResponse.Item2); + else + droppedDocuments.Add(documentWithResponse); + } + } - var retryDocuments = documentsWithResponse - .Where(x => !x.Item1.IsValid && _retryPredicate(x.Item1, x.Item2)) - .Select(x => x.Item2) - .ToList(); + HandleDroppedDocuments(droppedDocuments, response); - if (retryDocuments.Count > 0 && backOffRetries < _backOffRetries) - return await RetryDocuments(page, ++backOffRetries, retryDocuments); - else if (retryDocuments.Count > 0) + if (retryableDocuments.Count > 0 && backOffRetries < _backOffRetries) + return await RetryDocuments(page, ++backOffRetries, retryableDocuments).ConfigureAwait(false); + + if (retryableDocuments.Count > 0) throw ThrowOnBadBulk(response, $"Bulk indexing failed and after retrying {backOffRetries} times"); - _partitionedBulkRequest.BackPressure?.Release(); - return new BulkAllResponse { Retries = backOffRetries, Page = page }; + request.BackPressure?.Release(); + + return new BulkAllResponse { Retries = backOffRetries, Page = page, Items = response.Items }; } - private void HandleDroppedDocuments(List> documentsWithResponse, IBulkResponse response) + private void HandleDroppedDocuments(List> droppedDocuments, IBulkResponse response) { - var droppedDocuments = documentsWithResponse - .Where(x => !x.Item1.IsValid && !_retryPredicate(x.Item1, x.Item2)) - .ToList(); if (droppedDocuments.Count <= 0) return; foreach (var dropped in droppedDocuments) _droppedDocumentCallBack(dropped.Item1, dropped.Item2); @@ -185,7 +194,7 @@ private async Task HandleBulkRequest(IList buffer, long pag throw ThrowOnBadBulk(response, $"BulkAll halted after {nameof(PipelineFailure)}{failureReason.GetStringValue()} from _bulk"); default: - return await RetryDocuments(page, ++backOffRetries, buffer); + return await RetryDocuments(page, ++backOffRetries, buffer).ConfigureAwait(false); } } diff --git a/src/Nest/Document/Multiple/BulkAll/BulkAllResponse.cs b/src/Nest/Document/Multiple/BulkAll/BulkAllResponse.cs index b768847d466..700e60b7452 100644 --- a/src/Nest/Document/Multiple/BulkAll/BulkAllResponse.cs +++ b/src/Nest/Document/Multiple/BulkAll/BulkAllResponse.cs @@ -1,4 +1,6 @@ -using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using Newtonsoft.Json; namespace Nest { @@ -12,6 +14,9 @@ public interface IBulkAllResponse /// The number of back off retries were needed to store this document. int Retries { get; } + + /// The items returned from the bulk response + IReadOnlyCollection Items { get; } } /// @@ -26,5 +31,8 @@ public class BulkAllResponse : IBulkAllResponse /// public int Retries { get; internal set; } + + /// + public IReadOnlyCollection Items { get; internal set; } } } diff --git a/src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs index 7fbe603dda7..a8060ae8ae4 100644 --- a/src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs +++ b/src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs @@ -50,7 +50,7 @@ private void ScrollAll(string index, int size, int numberOfShards, int numberOfD seenDocuments.Should().Be(numberOfDocuments); var groups = seenSlices.GroupBy(s => s).ToList(); - groups.Count().Should().Be(numberOfShards); + groups.Count.Should().Be(numberOfShards); groups.Should().OnlyContain(g => g.Count() > 1); } @@ -70,7 +70,15 @@ private void BulkAll(string index, IEnumerable documents, int size, .Index(index) ); //we set up an observer - var bulkObserver = observableBulk.Wait(TimeSpan.FromMinutes(5), b => Interlocked.Increment(ref seenPages)); + var bulkObserver = observableBulk.Wait(TimeSpan.FromMinutes(5), b => + { + Interlocked.Increment(ref seenPages); + foreach (var item in b.Items) + { + item.IsValid.Should().BeTrue(); + item.Id.Should().NotBeNullOrEmpty(); + } + }); droppedDocuments.Take(10).Should().BeEmpty(); bulkObserver.TotalNumberOfFailedBuffers.Should().Be(0, "All buffers are expected to be indexed");