Skip to content

Commit

Permalink
Set acking timeout to 0 on dynamic mapping update
Browse files Browse the repository at this point in the history
  • Loading branch information
ywelsch committed Jan 11, 2019
1 parent 4d3928d commit 779cb2c
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private PutMappingRequestBuilder updateMappingRequest(Index index, String type,
throw new IllegalArgumentException("_default_ mapping should not be updated");
}
return client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON)
.setMasterNodeTimeout(timeout).setTimeout(timeout);
.setMasterNodeTimeout(timeout).setTimeout(TimeValue.ZERO);
}

/**
Expand All @@ -84,8 +84,6 @@ public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdat
* been applied to the master node and propagated to data nodes.
*/
public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, TimeValue timeout) {
if (updateMappingRequest(index, type, mappingUpdate, timeout).get().isAcknowledged() == false) {
throw new ElasticsearchTimeoutException("Failed to acknowledge mapping update within [" + timeout + "]");
}
updateMappingRequest(index, type, mappingUpdate, timeout).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ protected Settings nodeSettings(int nodeOrdinal) {
* This retry logic is implemented in TransportMasterNodeAction and tested by the following master failover scenario.
*/
@TestLogging("_root:DEBUG")
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30844")
public void testMasterFailoverDuringIndexingWithMappingChanges() throws Throwable {
logger.info("--> start 4 nodes, 3 master, 1 data");

final Settings sharedSettings = Settings.builder()
.put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly
.put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly
.put(TestZenDiscovery.USE_ZEN2.getKey(), false)
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
*/

import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -30,6 +32,7 @@
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
Expand Down Expand Up @@ -91,6 +94,34 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
}

public void testBulkWeirdScenario() throws Exception {
String master = internalCluster().startMasterOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNodes(2);

assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get());
ensureGreen();

BulkResponse bulkResponse = client().prepareBulk()
.add(client().prepareIndex().setIndex("test").setType("_doc").setId("1").setSource("field1", "value1"))
.add(client().prepareUpdate().setIndex("test").setType("_doc").setId("1").setDoc("field2", "value2"))
.execute().actionGet();

assertThat(bulkResponse.hasFailures(), equalTo(false));
assertThat(bulkResponse.getItems().length, equalTo(2));

logger.info(Strings.toString(bulkResponse, true, true));

internalCluster().assertSeqNos();

assertThat(bulkResponse.getItems()[0].getResponse().getId(), equalTo("1"));
assertThat(bulkResponse.getItems()[0].getResponse().getVersion(), equalTo(1L));
assertThat(bulkResponse.getItems()[0].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertThat(bulkResponse.getItems()[1].getResponse().getId(), equalTo("1"));
assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(2L));
assertThat(bulkResponse.getItems()[1].getResponse().getResult(), equalTo(DocWriteResponse.Result.UPDATED));
}

private void createStaleReplicaScenario(String master) throws Exception {
client().prepareIndex("test", "type1").setSource(jsonBuilder()
.startObject().field("field", "value1").endObject()).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -57,6 +58,7 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -397,6 +399,21 @@ public void onFailure(Exception e) {

assertBusy(() -> assertTrue(client().prepareGet("index", "type", "1").get().isExists()));

ActionFuture<IndexResponse> dynamicMappingsFut = client().prepareIndex("index", "type", "2").setSource("field2", 42).execute();

// ...and wait for second mapping to be available on master
assertBusy(() -> {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, master);
final IndexService indexService = indicesService.indexServiceSafe(index);
assertNotNull(indexService);
final MapperService mapperService = indexService.mapperService();
DocumentMapper mapper = mapperService.documentMapper("type");
assertNotNull(mapper);
assertNotNull(mapper.mappers().getMapper("field2"));
});

assertBusy(() -> assertTrue(client().prepareGet("index", "type", "2").get().isExists()));

// The mappings have not been propagated to the replica yet as a consequence the document count not be indexed
// We wait on purpose to make sure that the document is not indexed because the shard operation is stalled
// and not just because it takes time to replicate the indexing request to the replica
Expand All @@ -415,6 +432,8 @@ public void onFailure(Exception e) {
assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()),
2, docResp.getShardInfo().getTotal()); // both shards should have succeeded
});

assertThat(dynamicMappingsFut.get().getResult(), equalTo(CREATED));
}

}

0 comments on commit 779cb2c

Please sign in to comment.