Skip to content

Commit

Permalink
Deprecate batch_size parameter on bulk API (#14725)
Browse files Browse the repository at this point in the history
By default the full _bulk payload will be passed to ingest processors as a
batch, with any sub batching logic to be implemented by each processor if
necessary.

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>
  • Loading branch information
chishui authored Jul 22, 2024
1 parent 5845106 commit 97f26cc
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 101 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow system index warning in OpenSearchRestTestCase.refreshAllIndices ([#14635](https://github.com/opensearch-project/OpenSearch/pull/14635))

### Deprecated
- Deprecate batch_size parameter on bulk API ([#14725](https://github.com/opensearch-project/OpenSearch/pull/14725))

### Removed
- Remove query categorization changes ([#14759](https://github.com/opensearch-project/OpenSearch/pull/14759))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,14 @@ teardown:
- match: { _source: {"f1": "v2", "f2": 47, "field1": "value1", "field2": "value2"}}

---
"Test bulk API with batch enabled happy case":
"Test bulk API with default batch size":
- skip:
version: " - 2.13.99"
reason: "Added in 2.14.0"

- do:
bulk:
refresh: true
batch_size: 2
pipeline: "pipeline1"
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
Expand Down Expand Up @@ -245,36 +244,6 @@ teardown:
id: test_id3
- match: { _source: { "text": "text3", "field1": "value1" } }

---
"Test bulk API with batch_size missing":
- skip:
version: " - 2.13.99"
reason: "Added in 2.14.0"

- do:
bulk:
refresh: true
pipeline: "pipeline1"
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
- '{"text": "text1"}'
- '{"index": {"_index": "test_index", "_id": "test_id2"}}'
- '{"text": "text2"}'

- match: { errors: false }

- do:
get:
index: test_index
id: test_id1
- match: { _source: { "text": "text1", "field1": "value1" } }

- do:
get:
index: test_index
id: test_id2
- match: { _source: { "text": "text2", "field1": "value1" } }

---
"Test bulk API with invalid batch_size":
- skip:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,87 @@ public void testBulkWithUpsert() throws Exception {
assertThat(upserted.get("processed"), equalTo(true));
}

public void testSingleDocIngestFailure() throws Exception {
createIndex("test");
BytesReference source = BytesReference.bytes(
jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("test")
.endObject()
.endObject()
.endArray()
.endObject()
);
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();

GetPipelineRequest getPipelineRequest = new GetPipelineRequest("_id");
GetPipelineResponse getResponse = client().admin().cluster().getPipeline(getPipelineRequest).get();
assertThat(getResponse.isFound(), is(true));
assertThat(getResponse.pipelines().size(), equalTo(1));
assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id"));

assertThrows(
IllegalArgumentException.class,
() -> client().prepareIndex("test")
.setId("1")
.setPipeline("_id")
.setSource(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", true)
.get()
);

DeletePipelineRequest deletePipelineRequest = new DeletePipelineRequest("_id");
AcknowledgedResponse response = client().admin().cluster().deletePipeline(deletePipelineRequest).get();
assertThat(response.isAcknowledged(), is(true));

getResponse = client().admin().cluster().prepareGetPipeline("_id").get();
assertThat(getResponse.isFound(), is(false));
assertThat(getResponse.pipelines().size(), equalTo(0));
}

public void testSingleDocIngestDrop() throws Exception {
createIndex("test");
BytesReference source = BytesReference.bytes(
jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("test")
.endObject()
.endObject()
.endArray()
.endObject()
);
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();

GetPipelineRequest getPipelineRequest = new GetPipelineRequest("_id");
GetPipelineResponse getResponse = client().admin().cluster().getPipeline(getPipelineRequest).get();
assertThat(getResponse.isFound(), is(true));
assertThat(getResponse.pipelines().size(), equalTo(1));
assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id"));

DocWriteResponse indexResponse = client().prepareIndex("test")
.setId("1")
.setPipeline("_id")
.setSource(Requests.INDEX_CONTENT_TYPE, "field", "value", "drop", true)
.get();
assertEquals(DocWriteResponse.Result.NOOP, indexResponse.getResult());

Map<String, Object> doc = client().prepareGet("test", "1").get().getSourceAsMap();
assertNull(doc);

DeletePipelineRequest deletePipelineRequest = new DeletePipelineRequest("_id");
AcknowledgedResponse response = client().admin().cluster().deletePipeline(deletePipelineRequest).get();
assertThat(response.isAcknowledged(), is(true));

getResponse = client().admin().cluster().prepareGetPipeline("_id").get();
assertThat(getResponse.isFound(), is(false));
assertThat(getResponse.pipelines().size(), equalTo(0));
}

public void test() throws Exception {
BytesReference source = BytesReference.bytes(
jsonBuilder().startObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
private String globalRouting;
private String globalIndex;
private Boolean globalRequireAlias;
private int batchSize = 1;
private int batchSize = Integer.MAX_VALUE;

private long sizeInBytes = 0;

Expand Down
64 changes: 3 additions & 61 deletions server/src/main/java/org/opensearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -525,61 +525,7 @@ public void onFailure(Exception e) {

@Override
protected void doRun() {
int batchSize = originalBulkRequest.batchSize();
if (shouldExecuteBulkRequestInBatch(originalBulkRequest.requests().size(), batchSize)) {
runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped, originalBulkRequest);
return;
}

final Thread originalThread = Thread.currentThread();
final AtomicInteger counter = new AtomicInteger(numberOfActionRequests);
int i = 0;
for (DocWriteRequest<?> actionRequest : actionRequests) {
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
if (indexRequest == null) {
if (counter.decrementAndGet() == 0) {
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
i++;
continue;
}
final String pipelineId = indexRequest.getPipeline();
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
final String finalPipelineId = indexRequest.getFinalPipeline();
indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME);
boolean hasFinalPipeline = true;
final List<String> pipelines;
if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false
&& IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
pipelines = Arrays.asList(pipelineId, finalPipelineId);
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) {
pipelines = Collections.singletonList(pipelineId);
hasFinalPipeline = false;
} else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
pipelines = Collections.singletonList(finalPipelineId);
} else {
if (counter.decrementAndGet() == 0) {
onCompletion.accept(originalThread, null);
}
assert counter.get() >= 0;
i++;
continue;
}

executePipelines(
i,
pipelines.iterator(),
hasFinalPipeline,
indexRequest,
onDropped,
onFailure,
counter,
onCompletion,
originalThread
);
i++;
}
runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped, originalBulkRequest);
}
});
}
Expand Down Expand Up @@ -635,7 +581,7 @@ private void runBulkRequestInBatch(
i++;
}

int batchSize = originalBulkRequest.batchSize();
int batchSize = Math.min(numberOfActionRequests, originalBulkRequest.batchSize());
List<List<IndexRequestWrapper>> batches = prepareBatches(batchSize, indexRequestWrappers);
logger.debug("batchSize: {}, batches: {}", batchSize, batches.size());

Expand All @@ -654,10 +600,6 @@ private void runBulkRequestInBatch(
}
}

private boolean shouldExecuteBulkRequestInBatch(int documentSize, int batchSize) {
return documentSize > 1 && batchSize > 1;
}

/**
* IndexRequests are grouped by unique (index + pipeline_ids) before batching.
* Only IndexRequests in the same group could be batched. It's to ensure batched documents always
Expand Down Expand Up @@ -685,7 +627,7 @@ static List<List<IndexRequestWrapper>> prepareBatches(int batchSize, List<IndexR
}
List<List<IndexRequestWrapper>> batchedIndexRequests = new ArrayList<>();
for (Map.Entry<Integer, List<IndexRequestWrapper>> indexRequestsPerKey : indexRequestsPerIndexAndPipelines.entrySet()) {
for (int i = 0; i < indexRequestsPerKey.getValue().size(); i += batchSize) {
for (int i = 0; i < indexRequestsPerKey.getValue().size(); i += Math.min(indexRequestsPerKey.getValue().size(), batchSize)) {
batchedIndexRequests.add(
new ArrayList<>(
indexRequestsPerKey.getValue().subList(i, i + Math.min(batchSize, indexRequestsPerKey.getValue().size() - i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.client.Requests;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.settings.Settings;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
Expand Down Expand Up @@ -66,6 +67,8 @@
public class RestBulkAction extends BaseRestHandler {

private final boolean allowExplicitIndex;
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestBulkAction.class);
static final String BATCH_SIZE_DEPRECATED_MESSAGE = "The batch size option in bulk API is deprecated and will be removed in 3.0.";

public RestBulkAction(Settings settings) {
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
Expand Down Expand Up @@ -97,7 +100,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null);
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.batchSize(request.paramAsInt("batch_size", 1));
if (request.hasParam("batch_size")) {
deprecationLogger.deprecate("batch_size_deprecation", BATCH_SIZE_DEPRECATED_MESSAGE);
}
bulkRequest.batchSize(request.paramAsInt("batch_size", Integer.MAX_VALUE));
bulkRequest.add(
request.requiredContent(),
defaultIndex,
Expand Down
53 changes: 47 additions & 6 deletions server/src/test/java/org/opensearch/ingest/IngestServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1134,10 +1134,14 @@ public void testBulkRequestExecutionWithFailures() throws Exception {
Exception error = new RuntimeException();
doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(null, error);
List<IngestDocumentWrapper> ingestDocumentWrappers = (List) args.getArguments()[0];
Consumer<List<IngestDocumentWrapper>> handler = (Consumer) args.getArguments()[1];
for (IngestDocumentWrapper wrapper : ingestDocumentWrappers) {
wrapper.update(wrapper.getIngestDocument(), error);
}
handler.accept(ingestDocumentWrappers);
return null;
}).when(processor).execute(any(), any());
}).when(processor).batchExecute(any(), any());
IngestService ingestService = createWithProcessors(
Collections.singletonMap("mock", (factories, tag, description, config) -> processor)
);
Expand Down Expand Up @@ -1192,10 +1196,11 @@ public void testBulkRequestExecution() throws Exception {
when(processor.getTag()).thenReturn("mockTag");
doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
List<IngestDocumentWrapper> ingestDocumentWrappers = (List) args.getArguments()[0];
Consumer<List<IngestDocumentWrapper>> handler = (Consumer) args.getArguments()[1];
handler.accept(ingestDocumentWrappers);
return null;
}).when(processor).execute(any(), any());
}).when(processor).batchExecute(any(), any());
Map<String, Processor.Factory> map = new HashMap<>(2);
map.put("mock", (factories, tag, description, config) -> processor);

Expand Down Expand Up @@ -1957,6 +1962,42 @@ public void testExecuteBulkRequestInBatchWithExceptionAndDropInCallback() {
verify(mockCompoundProcessor, never()).execute(any(), any());
}

public void testExecuteBulkRequestInBatchWithDefaultBatchSize() {
CompoundProcessor mockCompoundProcessor = mockCompoundProcessor();
IngestService ingestService = createWithProcessors(
Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor)
);
createPipeline("_id", ingestService);
BulkRequest bulkRequest = new BulkRequest();
IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest1);
IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest2);
IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_none").setFinalPipeline("_id");
bulkRequest.add(indexRequest3);
IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
bulkRequest.add(indexRequest4);
@SuppressWarnings("unchecked")
final Map<Integer, Exception> failureHandler = new HashMap<>();
final Map<Thread, Exception> completionHandler = new HashMap<>();
final List<Integer> dropHandler = new ArrayList<>();
ingestService.executeBulkRequest(
4,
bulkRequest.requests(),
failureHandler::put,
completionHandler::put,
dropHandler::add,
Names.WRITE,
bulkRequest
);
assertTrue(failureHandler.isEmpty());
assertTrue(dropHandler.isEmpty());
assertEquals(1, completionHandler.size());
assertNull(completionHandler.get(Thread.currentThread()));
verify(mockCompoundProcessor, times(1)).batchExecute(any(), any());
verify(mockCompoundProcessor, never()).execute(any(), any());
}

public void testPrepareBatches_same_index_pipeline() {
IngestService.IndexRequestWrapper wrapper1 = createIndexRequestWrapper("index1", Collections.singletonList("p1"));
IngestService.IndexRequestWrapper wrapper2 = createIndexRequestWrapper("index1", Collections.singletonList("p1"));
Expand Down

0 comments on commit 97f26cc

Please sign in to comment.