Skip to content

Commit

Permalink
Adding a simulate ingest API
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Oct 17, 2023
1 parent 1773d4b commit be5f759
Show file tree
Hide file tree
Showing 14 changed files with 853 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,10 @@
import org.elasticsearch.action.admin.indices.validate.query.TransportValidateQueryAction;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.SimulateBulkAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.bulk.TransportSimulateBulkAction;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.TransportDeleteAction;
import org.elasticsearch.action.explain.ExplainAction;
Expand Down Expand Up @@ -444,6 +446,7 @@
import org.elasticsearch.rest.action.ingest.RestDeletePipelineAction;
import org.elasticsearch.rest.action.ingest.RestGetPipelineAction;
import org.elasticsearch.rest.action.ingest.RestPutPipelineAction;
import org.elasticsearch.rest.action.ingest.RestSimulateIngestAction;
import org.elasticsearch.rest.action.ingest.RestSimulatePipelineAction;
import org.elasticsearch.rest.action.search.RestClearScrollAction;
import org.elasticsearch.rest.action.search.RestCountAction;
Expand Down Expand Up @@ -763,6 +766,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class);
actions.register(TransportShardMultiGetAction.TYPE, TransportShardMultiGetAction.class);
actions.register(BulkAction.INSTANCE, TransportBulkAction.class);
actions.register(SimulateBulkAction.INSTANCE, TransportSimulateBulkAction.class);
actions.register(TransportShardBulkAction.TYPE, TransportShardBulkAction.class);
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
Expand Down Expand Up @@ -949,6 +953,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestGetComposableIndexTemplateAction());
registerHandler.accept(new RestDeleteComposableIndexTemplateAction());
registerHandler.accept(new RestSimulateIndexTemplateAction());
registerHandler.accept(new RestSimulateIngestAction());
registerHandler.accept(new RestSimulateTemplateAction());

registerHandler.accept(new RestPutMappingAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.ActionType;
import org.elasticsearch.transport.TransportRequestOptions;

public class SimulateBulkAction extends ActionType<BulkResponse> {

public static final SimulateBulkAction INSTANCE = new SimulateBulkAction();
public static final String NAME = "indices:admin/simulate/bulk";

private static final TransportRequestOptions TRANSPORT_REQUEST_OPTIONS = TransportRequestOptions.of(
null,
TransportRequestOptions.Type.BULK
);

private SimulateBulkAction() {
super(NAME, BulkResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* This extends BulkRequest with support for providing substitute pipeline definitions.
*/
public class SimulateBulkRequest extends BulkRequest {
// Non-private for unit testing
Map<String, Object> pipelineSubstitutions = Map.of();

public SimulateBulkRequest() {
super();
}

public SimulateBulkRequest(StreamInput in) throws IOException {
super(in);
this.pipelineSubstitutions = in.readMap();
}

public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeGenericMap(pipelineSubstitutions);
}

public void setPipelineSubstitutions(Map<String, Object> pipelineSubstitutions) {
this.pipelineSubstitutions = pipelineSubstitutions;
}

@SuppressWarnings("unchecked")
public Map<String, Pipeline> getPipelineSubstitutions(IngestService ingestService) throws Exception {
Map<String, Pipeline> parsedPipelineSubstitutions = new HashMap<>();
if (pipelineSubstitutions != null) {
for (Map.Entry<String, Object> entry : pipelineSubstitutions.entrySet()) {
String pipelineId = entry.getKey();
Pipeline pipeline = Pipeline.create(
pipelineId,
(Map<String, Object>) entry.getValue(),
ingestService.getProcessorFactories(),
ingestService.getScriptService()
);
parsedPipelineSubstitutions.put(pipelineId, pipeline);
}
}
return parsedPipelineSubstitutions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,35 @@ public TransportBulkAction(
SystemIndices systemIndices,
LongSupplier relativeTimeProvider
) {
super(BulkAction.NAME, transportService, actionFilters, BulkRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this(
BulkAction.NAME,
threadPool,
transportService,
clusterService,
ingestService,
client,
actionFilters,
indexNameExpressionResolver,
indexingPressure,
systemIndices,
relativeTimeProvider
);
}

TransportBulkAction(
String actionName,
ThreadPool threadPool,
TransportService transportService,
ClusterService clusterService,
IngestService ingestService,
NodeClient client,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndexingPressure indexingPressure,
SystemIndices systemIndices,
LongSupplier relativeTimeProvider
) {
super(actionName, transportService, actionFilters, BulkRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
Objects.requireNonNull(relativeTimeProvider);
this.threadPool = threadPool;
this.clusterService = clusterService;
Expand Down Expand Up @@ -336,6 +364,19 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
}

// Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
indexData(task, bulkRequest, executorName, listener, responses, autoCreateIndices, indicesThatCannotBeCreated, startTime);
}

protected void indexData(
Task task,
BulkRequest bulkRequest,
String executorName,
ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses,
Set<String> autoCreateIndices,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated,
long startTime
) {
if (autoCreateIndices.isEmpty()) {
executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated);
} else {
Expand Down Expand Up @@ -386,6 +427,10 @@ protected void doRun() {
}
}

protected IngestService getIngestService(BulkRequest request) {
return ingestService;
}

static void prohibitAppendWritesInBackingIndices(DocWriteRequest<?> writeRequest, Metadata metadata) {
DocWriteRequest.OpType opType = writeRequest.opType();
if ((opType == OpType.CREATE || opType == OpType.INDEX) == false) {
Expand Down Expand Up @@ -491,7 +536,7 @@ private static boolean setResponseFailureIfIndexMatches(
return false;
}

private long buildTookInMillis(long startTimeNanos) {
protected long buildTookInMillis(long startTimeNanos) {
return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeNanos);
}

Expand Down Expand Up @@ -809,7 +854,7 @@ private void processBulkIndexIngestRequest(
) {
final long ingestStartTimeInNanos = System.nanoTime();
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
ingestService.executeBulkRequest(
getIngestService(original).executeBulkRequest(
original.numberOfActions(),
() -> bulkRequestModifier,
bulkRequestModifier::markItemAsDropped,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.SimulateIndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.SimulateIngestService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Map;
import java.util.Set;

public class TransportSimulateBulkAction extends TransportBulkAction {
@Inject
public TransportSimulateBulkAction(
ThreadPool threadPool,
TransportService transportService,
ClusterService clusterService,
IngestService ingestService,
NodeClient client,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndexingPressure indexingPressure,
SystemIndices systemIndices
) {
super(
SimulateBulkAction.NAME,
threadPool,
transportService,
clusterService,
ingestService,
client,
actionFilters,
indexNameExpressionResolver,
indexingPressure,
systemIndices,
System::nanoTime
);
}

@Override
protected void indexData(
Task task,
BulkRequest bulkRequest,
String executorName,
ActionListener<BulkResponse> listener,
AtomicArray<BulkItemResponse> responses,
Set<String> autoCreateIndices,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated,
long startTime
) {
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> request = bulkRequest.requests.get(i);
responses.set(
i,
BulkItemResponse.success(
0,
DocWriteRequest.OpType.CREATE,
new SimulateIndexResponse(
request.index(),
((IndexRequest) request).source(),
((IndexRequest) request).getContentType(),
((IndexRequest) request).getExecutedPipelines()
)
)
);
}
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTime)));
}

@Override
protected IngestService getIngestService(BulkRequest request) {
IngestService rawIngestService = super.getIngestService(request);
return new SimulateIngestService(rawIngestService, request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class IndexResponse extends DocWriteResponse {
* information about the pipelines executed. An empty list means that there were no pipelines executed.
*/
@Nullable
private final List<String> executedPipelines;
protected final List<String> executedPipelines;

public IndexResponse(ShardId shardId, StreamInput in) throws IOException {
super(shardId, in);
Expand Down
Loading

0 comments on commit be5f759

Please sign in to comment.