Skip to content

Commit

Permalink
Expose BulkResponseItems on BulkAllResponse (#3598)
Browse files Browse the repository at this point in the history
This commit exposes the bulk response items on the BulkAllResponse.

Closes #3487
  • Loading branch information
russcam committed Mar 20, 2019
1 parent 3d244b9 commit 2d620bc
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 26 deletions.
55 changes: 32 additions & 23 deletions src/Nest/Document/Multiple/BulkAll/BulkAllObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,18 @@ private async Task<IBulkAllResponse> BulkAsync(IList<T> buffer, long page, int b
{
_compositeCancelToken.ThrowIfCancellationRequested();

var r = _partitionedBulkRequest;
var request = _partitionedBulkRequest;
var response = await _client.BulkAsync(s =>
{
s.Index(r.Index).Type(r.Type);
if (r.BufferToBulk != null) r.BufferToBulk(s, buffer);
s.Index(request.Index).Type(request.Type);
if (request.BufferToBulk != null) request.BufferToBulk(s, buffer);
else s.IndexMany(buffer);
if (!string.IsNullOrEmpty(r.Pipeline)) s.Pipeline(r.Pipeline);
if (!string.IsNullOrEmpty(request.Pipeline)) s.Pipeline(request.Pipeline);
#pragma warning disable 618
if (r.Refresh.HasValue) s.Refresh(r.Refresh.Value);
if (request.Refresh.HasValue) s.Refresh(request.Refresh.Value);
#pragma warning restore 618
if (r.Routing != null) s.Routing(r.Routing);
if (r.WaitForActiveShards.HasValue) s.WaitForActiveShards(r.WaitForActiveShards.ToString());
if (request.Routing != null) s.Routing(request.Routing);
if (request.WaitForActiveShards.HasValue) s.WaitForActiveShards(request.WaitForActiveShards.ToString());
return s;
}, _compositeCancelToken)
Expand All @@ -134,29 +134,38 @@ private async Task<IBulkAllResponse> BulkAsync(IList<T> buffer, long page, int b
if (!response.ApiCall.Success)
return await HandleBulkRequest(buffer, page, backOffRetries, response);

var documentsWithResponse = response.Items.Zip(buffer, Tuple.Create).ToList();
var successfulDocuments = new List<Tuple<IBulkResponseItem, T>>();
var retryableDocuments = new List<T>();
var droppedDocuments = new List<Tuple<IBulkResponseItem, T>>();

HandleDroppedDocuments(documentsWithResponse, response);
foreach (var documentWithResponse in response.Items.Zip(buffer, Tuple.Create))
{
if (documentWithResponse.Item1.IsValid)
successfulDocuments.Add(documentWithResponse);
else
{
if (_retryPredicate(documentWithResponse.Item1, documentWithResponse.Item2))
retryableDocuments.Add(documentWithResponse.Item2);
else
droppedDocuments.Add(documentWithResponse);
}
}

var retryDocuments = documentsWithResponse
.Where(x => !x.Item1.IsValid && _retryPredicate(x.Item1, x.Item2))
.Select(x => x.Item2)
.ToList();
HandleDroppedDocuments(droppedDocuments, response);

if (retryDocuments.Count > 0 && backOffRetries < _backOffRetries)
return await RetryDocuments(page, ++backOffRetries, retryDocuments);
else if (retryDocuments.Count > 0)
if (retryableDocuments.Count > 0 && backOffRetries < _backOffRetries)
return await RetryDocuments(page, ++backOffRetries, retryableDocuments).ConfigureAwait(false);

if (retryableDocuments.Count > 0)
throw ThrowOnBadBulk(response, $"Bulk indexing failed and after retrying {backOffRetries} times");

_partitionedBulkRequest.BackPressure?.Release();
return new BulkAllResponse { Retries = backOffRetries, Page = page };
request.BackPressure?.Release();

return new BulkAllResponse { Retries = backOffRetries, Page = page, Items = response.Items };
}

private void HandleDroppedDocuments(List<Tuple<IBulkResponseItem, T>> documentsWithResponse, IBulkResponse response)
private void HandleDroppedDocuments(List<Tuple<IBulkResponseItem, T>> droppedDocuments, IBulkResponse response)
{
var droppedDocuments = documentsWithResponse
.Where(x => !x.Item1.IsValid && !_retryPredicate(x.Item1, x.Item2))
.ToList();
if (droppedDocuments.Count <= 0) return;

foreach (var dropped in droppedDocuments) _droppedDocumentCallBack(dropped.Item1, dropped.Item2);
Expand Down Expand Up @@ -185,7 +194,7 @@ private async Task<IBulkAllResponse> HandleBulkRequest(IList<T> buffer, long pag
throw ThrowOnBadBulk(response,
$"BulkAll halted after {nameof(PipelineFailure)}{failureReason.GetStringValue()} from _bulk");
default:
return await RetryDocuments(page, ++backOffRetries, buffer);
return await RetryDocuments(page, ++backOffRetries, buffer).ConfigureAwait(false);
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/Nest/Document/Multiple/BulkAll/BulkAllResponse.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using Newtonsoft.Json;

namespace Nest
{
Expand All @@ -12,6 +14,9 @@ public interface IBulkAllResponse

/// <summary>The number of back off retries were needed to store this document.</summary>
int Retries { get; }

/// <summary>The items returned from the bulk response</summary>
IReadOnlyCollection<IBulkResponseItem> Items { get; }
}

/// <inheritdoc />
Expand All @@ -26,5 +31,8 @@ public class BulkAllResponse : IBulkAllResponse

/// <inheritdoc />
public int Retries { get; internal set; }

/// <inheritdoc />
public IReadOnlyCollection<IBulkResponseItem> Items { get; internal set; }
}
}
12 changes: 10 additions & 2 deletions src/Tests/Tests/Document/Multiple/BulkAll/BulkAndScrollApiTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private void ScrollAll(string index, int size, int numberOfShards, int numberOfD

seenDocuments.Should().Be(numberOfDocuments);
var groups = seenSlices.GroupBy(s => s).ToList();
groups.Count().Should().Be(numberOfShards);
groups.Count.Should().Be(numberOfShards);
groups.Should().OnlyContain(g => g.Count() > 1);
}

Expand All @@ -70,7 +70,15 @@ private void BulkAll(string index, IEnumerable<SmallObject> documents, int size,
.Index(index)
);
//we set up an observer
var bulkObserver = observableBulk.Wait(TimeSpan.FromMinutes(5), b => Interlocked.Increment(ref seenPages));
var bulkObserver = observableBulk.Wait(TimeSpan.FromMinutes(5), b =>
{
Interlocked.Increment(ref seenPages);
foreach (var item in b.Items)
{
item.IsValid.Should().BeTrue();
item.Id.Should().NotBeNullOrEmpty();
}
});

droppedDocuments.Take(10).Should().BeEmpty();
bulkObserver.TotalNumberOfFailedBuffers.Should().Be(0, "All buffers are expected to be indexed");
Expand Down

0 comments on commit 2d620bc

Please sign in to comment.