Skip to content

Commit

Permalink
Set acking timeout to 0 on dynamic mapping update (#31140)
Browse files Browse the repository at this point in the history
As acking can fail for any reason (unrelated node being too slow, node disconnecting), it should not
be required for acking to succeed in order for index requests with dynamic mapping updates to
successfully complete.

Relates to #30672 and Closes #30844
  • Loading branch information
ywelsch committed Jan 24, 2019
1 parent 8765a31 commit 9f13eac
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.cluster.action.index;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
Expand Down Expand Up @@ -67,7 +66,7 @@ private PutMappingRequestBuilder updateMappingRequest(Index index, String type,
throw new IllegalArgumentException("_default_ mapping should not be updated");
}
return client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON)
.setMasterNodeTimeout(timeout).setTimeout(timeout);
.setMasterNodeTimeout(timeout).setTimeout(TimeValue.ZERO);
}

/**
Expand All @@ -84,8 +83,6 @@ public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdat
* been applied to the master node and propagated to data nodes.
*/
public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, TimeValue timeout) {
if (updateMappingRequest(index, type, mappingUpdate, timeout).get().isAcknowledged() == false) {
throw new ElasticsearchTimeoutException("Failed to acknowledge mapping update within [" + timeout + "]");
}
updateMappingRequest(index, type, mappingUpdate, timeout).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
* This retry logic is implemented in TransportMasterNodeAction and tested by the following master failover scenario.
*/
@TestLogging("_root:DEBUG")
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30844")
public void testMasterFailoverDuringIndexingWithMappingChanges() throws Throwable {
logger.info("--> start 4 nodes, 3 master, 1 data");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
*/

import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -30,6 +32,7 @@
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
Expand Down Expand Up @@ -89,6 +92,34 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
}

public void testBulkWeirdScenario() throws Exception {
String master = internalCluster().startMasterOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNodes(2);

assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get());
ensureGreen();

BulkResponse bulkResponse = client().prepareBulk()
.add(client().prepareIndex().setIndex("test").setType("_doc").setId("1").setSource("field1", "value1"))
.add(client().prepareUpdate().setIndex("test").setType("_doc").setId("1").setDoc("field2", "value2"))
.execute().actionGet();

assertThat(bulkResponse.hasFailures(), equalTo(false));
assertThat(bulkResponse.getItems().length, equalTo(2));

logger.info(Strings.toString(bulkResponse, true, true));

internalCluster().assertSeqNos();

assertThat(bulkResponse.getItems()[0].getResponse().getId(), equalTo("1"));
assertThat(bulkResponse.getItems()[0].getResponse().getVersion(), equalTo(1L));
assertThat(bulkResponse.getItems()[0].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertThat(bulkResponse.getItems()[1].getResponse().getId(), equalTo("1"));
assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(2L));
assertThat(bulkResponse.getItems()[1].getResponse().getResult(), equalTo(DocWriteResponse.Result.UPDATED));
}

private void createStaleReplicaScenario(String master) throws Exception {
client().prepareIndex("test", "type1").setSource(jsonBuilder()
.startObject().field("field", "value1").endObject()).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -57,6 +58,7 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -392,6 +394,24 @@ public void onFailure(Exception e) {
// Wait for document to be indexed on primary
assertBusy(() -> assertTrue(client().prepareGet("index", "type", "1").setPreference("_primary").get().isExists()));

// index another document, this time using dynamic mappings.
// The ack timeout of 0 on dynamic mapping updates makes it possible for the document to be indexed on the primary, even
// if the dynamic mapping update is not applied on the replica yet.
ActionFuture<IndexResponse> dynamicMappingsFut = client().prepareIndex("index", "type", "2").setSource("field2", 42).execute();

// ...and wait for second mapping to be available on master
assertBusy(() -> {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, master);
final IndexService indexService = indicesService.indexServiceSafe(index);
assertNotNull(indexService);
final MapperService mapperService = indexService.mapperService();
DocumentMapper mapper = mapperService.documentMapper("type");
assertNotNull(mapper);
assertNotNull(mapper.mappers().getMapper("field2"));
});

assertBusy(() -> assertTrue(client().prepareGet("index", "type", "2").get().isExists()));

// The mappings have not been propagated to the replica yet as a consequence the document count not be indexed
// We wait on purpose to make sure that the document is not indexed because the shard operation is stalled
// and not just because it takes time to replicate the indexing request to the replica
Expand All @@ -410,6 +430,8 @@ public void onFailure(Exception e) {
assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()),
2, docResp.getShardInfo().getTotal()); // both shards should have succeeded
});

assertThat(dynamicMappingsFut.get().getResult(), equalTo(CREATED));
}

}

0 comments on commit 9f13eac

Please sign in to comment.