Skip to content

Commit

Permalink
Distributed commit
Browse files Browse the repository at this point in the history
  • Loading branch information
henningandersen committed Nov 19, 2021
1 parent baad90c commit 61817f5
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
Expand Down Expand Up @@ -263,6 +264,12 @@ public void sendRequest(Transport.Connection connection, long requestId, String
// one failure
assertThat(response1.v2() != null, is(response2.v2() == null));

client().admin().indices().prepareRefresh("test").get();

GetResponse getResponse = client().prepareGet("test", "1").get();
// sadly this fails...
// assertThat(getResponse.isExists(), is(true));

}

Tuple<IndexResponse, Exception> resultOrException(ActionFuture<IndexResponse> future) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,11 @@
import org.elasticsearch.action.admin.indices.validate.query.TransportValidateQueryAction;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.ShardMarkCommitAndIndexAction;
import org.elasticsearch.action.bulk.ShardPrepareCommitAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.bulk.TransportShardMarkCommitAndIndexAction;
import org.elasticsearch.action.bulk.TransportShardPrepareCommitAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsTransportAction;
Expand Down Expand Up @@ -604,6 +606,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(BulkAction.INSTANCE, TransportBulkAction.class);
actions.register(TransportShardBulkAction.TYPE, TransportShardBulkAction.class);
actions.register(ShardPrepareCommitAction.INSTANCE, TransportShardPrepareCommitAction.class);
actions.register(ShardMarkCommitAndIndexAction.INSTANCE, TransportShardMarkCommitAndIndexAction.class);
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
actions.register(OpenPointInTimeAction.INSTANCE, TransportOpenPointInTimeAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.ActionType;

public class ShardMarkCommitAndIndexAction extends ActionType<ShardMarkCommitAndIndexResponse> {

public static final ShardMarkCommitAndIndexAction INSTANCE = new ShardMarkCommitAndIndexAction();
// todo: should the name be bulk[prepare] style?
public static final String NAME = "indices:data/write/commit";

private ShardMarkCommitAndIndexAction() {
super(NAME, ShardMarkCommitAndIndexResponse::new);
}

// todo: transport options?
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;

// todo: make this a per node request.
public class ShardMarkCommitAndIndexRequest extends ReplicatedWriteRequest<ShardMarkCommitAndIndexRequest> {
private final TxID txID;

public ShardMarkCommitAndIndexRequest(ShardId shardId, TxID txID) {
super(shardId);
this.txID = txID;
}

public ShardMarkCommitAndIndexRequest(StreamInput in) throws IOException {
super(in);
this.txID = new TxID(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
txID.writeTo(out);
}

public TxID txid() {
return txID;
}

@Override
public String toString() {
return "[" + shardId + "," + txID + "]";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Map;

public class ShardMarkCommitAndIndexResponse extends ReplicationResponse implements WriteResponse {


public ShardMarkCommitAndIndexResponse() {
}

public ShardMarkCommitAndIndexResponse(StreamInput in) throws IOException {
super(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}

@Override
public void setForcedRefresh(boolean forcedRefresh) {
// this does not refresh currently.
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

// todo: make this a per node request.
public class ShardPrepareCommitRequest extends ReplicatedWriteRequest<ShardPrepareCommitRequest> {
private TxID txID;
private final TxID txID;

public ShardPrepareCommitRequest(ShardId shardId, TxID txID) {
super(shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,8 +742,14 @@ private void commit(BulkResponse bulkResponse, ActionListener<BulkResponse> list
ActionListener<Void> markAndCommitListener = new ActionListener<Void>() {
@Override
public void onResponse(Void ignored) {
// todo: mark and commit
listener.onResponse(bulkResponse);
// failure is not an option at this time YOLO.
GroupedActionListener<ShardMarkCommitAndIndexResponse> commitResponseListener =
new GroupedActionListener<>(listener.map(responses -> bulkResponse), requestsByShard.size());
requestsByShard.keySet()
.forEach(shardId ->
client.executeLocally(ShardMarkCommitAndIndexAction.INSTANCE,
new ShardMarkCommitAndIndexRequest(shardId, txID), commitResponseListener)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,6 @@ protected void doRun() throws Exception {
}

primary.loggingComplete(request.txID(), transactionId[0]);
transactionId[0] = primary.commitTransaction(request.txID());
primary.closeTransaction(transactionId);
} catch (Exception x) {
logger.warn("Encountered an error while executing bulk transaction", x);
primary.rollbackTransaction(transactionId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.ExecutorSelector;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

public class TransportShardMarkCommitAndIndexAction extends TransportWriteAction<
ShardMarkCommitAndIndexRequest,
ShardMarkCommitAndIndexRequest,
ShardMarkCommitAndIndexResponse> {

@Inject
public TransportShardMarkCommitAndIndexAction(
Settings settings,
TransportService transportService,
ClusterService clusterService,
IndicesService indicesService,
ThreadPool threadPool,
ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndexingPressure indexingPressure,
SystemIndices systemIndices
) {
super(
settings,
ShardMarkCommitAndIndexAction.NAME,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
ShardMarkCommitAndIndexRequest::new,
ShardMarkCommitAndIndexRequest::new,
ExecutorSelector::getWriteExecutorForShard,
false,
indexingPressure,
systemIndices
);
}

@Override
protected ShardMarkCommitAndIndexResponse newResponseInstance(StreamInput in) throws IOException {
return new ShardMarkCommitAndIndexResponse(in);
}

@Override
protected void dispatchedShardOperationOnPrimary(
ShardMarkCommitAndIndexRequest request,
IndexShard primary,
ActionListener<PrimaryResult<ShardMarkCommitAndIndexRequest, ShardMarkCommitAndIndexResponse>> listener
) {
ActionListener.completeWith(listener, () -> {
primary.commitTransaction(request.txid());
return new PrimaryResult<>(request, new ShardMarkCommitAndIndexResponse());
});
}

@Override
protected void dispatchedShardOperationOnReplica(
ShardMarkCommitAndIndexRequest request,
IndexShard replica,
ActionListener<ReplicaResult> listener
) {
ActionListener.completeWith(listener, () -> {
replica.commitTransaction(request.txid());
return new ReplicaResult();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,8 @@ private boolean canOptimizeAddDocument(Index index) {
case LOCAL_RESET:
assert index.isRetry();
return true; // allow to optimize in order to update the max safe time stamp
case TRANSACTION:
return false;
default:
throw new IllegalArgumentException("unknown origin " + index.origin());
}
Expand Down Expand Up @@ -2101,8 +2103,8 @@ public Translog.Location commitTransaction(Translog.Location prevId, Function<Tr
}
}

closeTransaction(commitLocation);
return commitLocation;

}

@Override
Expand Down

0 comments on commit 61817f5

Please sign in to comment.