diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java index 088a3da08f051..cece71c2e0c25 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.ingest.PutPipelineRequest; @@ -263,6 +264,12 @@ public void sendRequest(Transport.Connection connection, long requestId, String // one failure assertThat(response1.v2() != null, is(response2.v2() == null)); + client().admin().indices().prepareRefresh("test").get(); + + GetResponse getResponse = client().prepareGet("test", "1").get(); +// sadly this fails... +// assertThat(getResponse.isExists(), is(true)); + } Tuple resultOrException(ActionFuture future) { diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 5b24e097bd866..bb32f10c3d6f1 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -187,9 +187,11 @@ import org.elasticsearch.action.admin.indices.validate.query.TransportValidateQueryAction; import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction; import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.ShardMarkCommitAndIndexAction; import org.elasticsearch.action.bulk.ShardPrepareCommitAction; import org.elasticsearch.action.bulk.TransportBulkAction; import org.elasticsearch.action.bulk.TransportShardBulkAction; +import org.elasticsearch.action.bulk.TransportShardMarkCommitAndIndexAction; import org.elasticsearch.action.bulk.TransportShardPrepareCommitAction; import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import org.elasticsearch.action.datastreams.ModifyDataStreamsTransportAction; @@ -604,6 +606,7 @@ public void reg actions.register(BulkAction.INSTANCE, TransportBulkAction.class); actions.register(TransportShardBulkAction.TYPE, TransportShardBulkAction.class); actions.register(ShardPrepareCommitAction.INSTANCE, TransportShardPrepareCommitAction.class); + actions.register(ShardMarkCommitAndIndexAction.INSTANCE, TransportShardMarkCommitAndIndexAction.class); actions.register(SearchAction.INSTANCE, TransportSearchAction.class); actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class); actions.register(OpenPointInTimeAction.INSTANCE, TransportOpenPointInTimeAction.class); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/ShardMarkCommitAndIndexAction.java b/server/src/main/java/org/elasticsearch/action/bulk/ShardMarkCommitAndIndexAction.java new file mode 100644 index 0000000000000..87bf775f2876a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/ShardMarkCommitAndIndexAction.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.ActionType; + +public class ShardMarkCommitAndIndexAction extends ActionType { + + public static final ShardMarkCommitAndIndexAction INSTANCE = new ShardMarkCommitAndIndexAction(); + // todo: should the name be bulk[prepare] style? + public static final String NAME = "indices:data/write/commit"; + + private ShardMarkCommitAndIndexAction() { + super(NAME, ShardMarkCommitAndIndexResponse::new); + } + + // todo: transport options? +} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/ShardMarkCommitAndIndexRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/ShardMarkCommitAndIndexRequest.java new file mode 100644 index 0000000000000..edbab74890650 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/ShardMarkCommitAndIndexRequest.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; + +// todo: make this a per node request. +public class ShardMarkCommitAndIndexRequest extends ReplicatedWriteRequest { + private final TxID txID; + + public ShardMarkCommitAndIndexRequest(ShardId shardId, TxID txID) { + super(shardId); + this.txID = txID; + } + + public ShardMarkCommitAndIndexRequest(StreamInput in) throws IOException { + super(in); + this.txID = new TxID(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + txID.writeTo(out); + } + + public TxID txid() { + return txID; + } + + @Override + public String toString() { + return "[" + shardId + "," + txID + "]"; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/ShardMarkCommitAndIndexResponse.java b/server/src/main/java/org/elasticsearch/action/bulk/ShardMarkCommitAndIndexResponse.java new file mode 100644 index 0000000000000..3f5a8ec31e9c7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/ShardMarkCommitAndIndexResponse.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.support.WriteResponse; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Map; + +public class ShardMarkCommitAndIndexResponse extends ReplicationResponse implements WriteResponse { + + + public ShardMarkCommitAndIndexResponse() { + } + + public ShardMarkCommitAndIndexResponse(StreamInput in) throws IOException { + super(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + + @Override + public void setForcedRefresh(boolean forcedRefresh) { + // this does not refresh currently. + throw new UnsupportedOperationException(); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/ShardPrepareCommitRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/ShardPrepareCommitRequest.java index 4959578e365ca..3fdd85d726b18 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/ShardPrepareCommitRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/ShardPrepareCommitRequest.java @@ -17,7 +17,7 @@ // todo: make this a per node request. public class ShardPrepareCommitRequest extends ReplicatedWriteRequest { - private TxID txID; + private final TxID txID; public ShardPrepareCommitRequest(ShardId shardId, TxID txID) { super(shardId); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 5de78d9e03c07..b1d0b7d45fbb0 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -742,8 +742,14 @@ private void commit(BulkResponse bulkResponse, ActionListener list ActionListener markAndCommitListener = new ActionListener() { @Override public void onResponse(Void ignored) { - // todo: mark and commit - listener.onResponse(bulkResponse); + // failure is not an option at this time YOLO. + GroupedActionListener commitResponseListener = + new GroupedActionListener<>(listener.map(responses -> bulkResponse), requestsByShard.size()); + requestsByShard.keySet() + .forEach(shardId -> + client.executeLocally(ShardMarkCommitAndIndexAction.INSTANCE, + new ShardMarkCommitAndIndexRequest(shardId, txID), commitResponseListener) + ); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 5a0907ef34d15..62d7b24806d6d 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -213,8 +213,6 @@ protected void doRun() throws Exception { } primary.loggingComplete(request.txID(), transactionId[0]); - transactionId[0] = primary.commitTransaction(request.txID()); - primary.closeTransaction(transactionId); } catch (Exception x) { logger.warn("Encountered an error while executing bulk transaction", x); primary.rollbackTransaction(transactionId); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardMarkCommitAndIndexAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardMarkCommitAndIndexAction.java new file mode 100644 index 0000000000000..19030d12ae97d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardMarkCommitAndIndexAction.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexingPressure; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.ExecutorSelector; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; + +public class TransportShardMarkCommitAndIndexAction extends TransportWriteAction< + ShardMarkCommitAndIndexRequest, + ShardMarkCommitAndIndexRequest, + ShardMarkCommitAndIndexResponse> { + + @Inject + public TransportShardMarkCommitAndIndexAction( + Settings settings, + TransportService transportService, + ClusterService clusterService, + IndicesService indicesService, + ThreadPool threadPool, + ShardStateAction shardStateAction, + ActionFilters actionFilters, + IndexingPressure indexingPressure, + SystemIndices systemIndices + ) { + super( + settings, + ShardMarkCommitAndIndexAction.NAME, + transportService, + clusterService, + indicesService, + threadPool, + shardStateAction, + actionFilters, + ShardMarkCommitAndIndexRequest::new, + ShardMarkCommitAndIndexRequest::new, + ExecutorSelector::getWriteExecutorForShard, + false, + indexingPressure, + systemIndices + ); + } + + @Override + protected ShardMarkCommitAndIndexResponse newResponseInstance(StreamInput in) throws IOException { + return new ShardMarkCommitAndIndexResponse(in); + } + + @Override + protected void dispatchedShardOperationOnPrimary( + ShardMarkCommitAndIndexRequest request, + IndexShard primary, + ActionListener> listener + ) { + ActionListener.completeWith(listener, () -> { + primary.commitTransaction(request.txid()); + return new PrimaryResult<>(request, new ShardMarkCommitAndIndexResponse()); + }); + } + + @Override + protected void dispatchedShardOperationOnReplica( + ShardMarkCommitAndIndexRequest request, + IndexShard replica, + ActionListener listener + ) { + ActionListener.completeWith(listener, () -> { + replica.commitTransaction(request.txid()); + return new ReplicaResult(); + }); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 4dc28bbcd3264..f0ead24e0252b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -865,6 +865,8 @@ private boolean canOptimizeAddDocument(Index index) { case LOCAL_RESET: assert index.isRetry(); return true; // allow to optimize in order to update the max safe time stamp + case TRANSACTION: + return false; default: throw new IllegalArgumentException("unknown origin " + index.origin()); } @@ -2101,8 +2103,8 @@ public Translog.Location commitTransaction(Translog.Location prevId, Function