diff --git a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index 770c6bca26b2f..c34a4196bb524 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.action.index; -import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.client.IndicesAdminClient; @@ -67,7 +66,7 @@ private PutMappingRequestBuilder updateMappingRequest(Index index, String type, throw new IllegalArgumentException("_default_ mapping should not be updated"); } return client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON) - .setMasterNodeTimeout(timeout).setTimeout(timeout); + .setMasterNodeTimeout(timeout).setTimeout(TimeValue.ZERO); } /** @@ -84,8 +83,6 @@ public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdat * been applied to the master node and propagated to data nodes. */ public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, TimeValue timeout) { - if (updateMappingRequest(index, type, mappingUpdate, timeout).get().isAcknowledged() == false) { - throw new ElasticsearchTimeoutException("Failed to acknowledge mapping update within [" + timeout + "]"); - } + updateMappingRequest(index, type, mappingUpdate, timeout).get(); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java b/server/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java index c6fb7c49802f1..7a18ca4cff199 100644 --- a/server/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java +++ b/server/src/test/java/org/elasticsearch/action/support/master/IndexingMasterFailoverIT.java @@ -66,7 +66,6 @@ protected Settings nodeSettings(int nodeOrdinal) { * This retry logic is implemented in TransportMasterNodeAction and tested by the following master failover scenario. */ @TestLogging("_root:DEBUG") - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30844") public void testMasterFailoverDuringIndexingWithMappingChanges() throws Throwable { logger.info("--> start 4 nodes, 3 master, 1 data"); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index b00cacd013fd8..3b1ef3234cd33 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -20,8 +20,10 @@ */ import com.carrotsearch.hppc.cursors.IntObjectCursor; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; @@ -30,6 +32,7 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; @@ -89,6 +92,34 @@ protected Settings nodeSettings(int nodeOrdinal) { .put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build(); } + public void testBulkWeirdScenario() throws Exception { + String master = internalCluster().startMasterOnlyNode(Settings.EMPTY); + internalCluster().startDataOnlyNodes(2); + + assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder() + .put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get()); + ensureGreen(); + + BulkResponse bulkResponse = client().prepareBulk() + .add(client().prepareIndex().setIndex("test").setType("_doc").setId("1").setSource("field1", "value1")) + .add(client().prepareUpdate().setIndex("test").setType("_doc").setId("1").setDoc("field2", "value2")) + .execute().actionGet(); + + assertThat(bulkResponse.hasFailures(), equalTo(false)); + assertThat(bulkResponse.getItems().length, equalTo(2)); + + logger.info(Strings.toString(bulkResponse, true, true)); + + internalCluster().assertSeqNos(); + + assertThat(bulkResponse.getItems()[0].getResponse().getId(), equalTo("1")); + assertThat(bulkResponse.getItems()[0].getResponse().getVersion(), equalTo(1L)); + assertThat(bulkResponse.getItems()[0].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED)); + assertThat(bulkResponse.getItems()[1].getResponse().getId(), equalTo("1")); + assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(2L)); + assertThat(bulkResponse.getItems()[1].getResponse().getResult(), equalTo(DocWriteResponse.Result.UPDATED)); + } + private void createStaleReplicaScenario(String master) throws Exception { client().prepareIndex("test", "type1").setSource(jsonBuilder() .startObject().field("field", "value1").endObject()).get(); diff --git a/server/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/server/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index 932091f0e8aba..cc95cc5b05c9e 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -57,6 +58,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.action.DocWriteResponse.Result.CREATED; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; @@ -392,6 +394,24 @@ public void onFailure(Exception e) { // Wait for document to be indexed on primary assertBusy(() -> assertTrue(client().prepareGet("index", "type", "1").setPreference("_primary").get().isExists())); + // index another document, this time using dynamic mappings. + // The ack timeout of 0 on dynamic mapping updates makes it possible for the document to be indexed on the primary, even + // if the dynamic mapping update is not applied on the replica yet. + ActionFuture dynamicMappingsFut = client().prepareIndex("index", "type", "2").setSource("field2", 42).execute(); + + // ...and wait for second mapping to be available on master + assertBusy(() -> { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, master); + final IndexService indexService = indicesService.indexServiceSafe(index); + assertNotNull(indexService); + final MapperService mapperService = indexService.mapperService(); + DocumentMapper mapper = mapperService.documentMapper("type"); + assertNotNull(mapper); + assertNotNull(mapper.mappers().getMapper("field2")); + }); + + assertBusy(() -> assertTrue(client().prepareGet("index", "type", "2").get().isExists())); + // The mappings have not been propagated to the replica yet as a consequence the document count not be indexed // We wait on purpose to make sure that the document is not indexed because the shard operation is stalled // and not just because it takes time to replicate the indexing request to the replica @@ -410,6 +430,8 @@ public void onFailure(Exception e) { assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()), 2, docResp.getShardInfo().getTotal()); // both shards should have succeeded }); + + assertThat(dynamicMappingsFut.get().getResult(), equalTo(CREATED)); } }