Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set acking timeout to 0 on dynamic mapping update #31140

Merged
merged 5 commits into from
Jan 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.cluster.action.index;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
Expand Down Expand Up @@ -67,7 +66,7 @@ private PutMappingRequestBuilder updateMappingRequest(Index index, String type,
throw new IllegalArgumentException("_default_ mapping should not be updated");
}
return client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON)
.setMasterNodeTimeout(timeout).setTimeout(timeout);
.setMasterNodeTimeout(timeout).setTimeout(TimeValue.ZERO);
}

/**
Expand All @@ -84,8 +83,6 @@ public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdat
* been applied to the master node and propagated to data nodes.
*/
public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, TimeValue timeout) {
if (updateMappingRequest(index, type, mappingUpdate, timeout).get().isAcknowledged() == false) {
throw new ElasticsearchTimeoutException("Failed to acknowledge mapping update within [" + timeout + "]");
}
updateMappingRequest(index, type, mappingUpdate, timeout).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ protected Settings nodeSettings(int nodeOrdinal) {
* This retry logic is implemented in TransportMasterNodeAction and tested by the following master failover scenario.
*/
@TestLogging("_root:DEBUG")
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30844")
public void testMasterFailoverDuringIndexingWithMappingChanges() throws Throwable {
logger.info("--> start 4 nodes, 3 master, 1 data");

final Settings sharedSettings = Settings.builder()
.put(FaultDetection.PING_TIMEOUT_SETTING.getKey(), "1s") // for hitting simulated network failures quickly
.put(FaultDetection.PING_RETRIES_SETTING.getKey(), "1") // for hitting simulated network failures quickly
.put(TestZenDiscovery.USE_ZEN2.getKey(), false)
.put("discovery.zen.join_timeout", "10s") // still long to induce failures but to long so test won't time out
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "1s") // <-- for hitting simulated network failures quickly
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
*/

import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -30,6 +32,7 @@
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
Expand Down Expand Up @@ -91,6 +94,34 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put(TestZenDiscovery.USE_MOCK_PINGS.getKey(), false).build();
}

public void testBulkWeirdScenario() throws Exception {
String master = internalCluster().startMasterOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNodes(2);

assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get());
ensureGreen();

BulkResponse bulkResponse = client().prepareBulk()
.add(client().prepareIndex().setIndex("test").setType("_doc").setId("1").setSource("field1", "value1"))
.add(client().prepareUpdate().setIndex("test").setType("_doc").setId("1").setDoc("field2", "value2"))
.execute().actionGet();

assertThat(bulkResponse.hasFailures(), equalTo(false));
assertThat(bulkResponse.getItems().length, equalTo(2));

logger.info(Strings.toString(bulkResponse, true, true));

internalCluster().assertSeqNos();

assertThat(bulkResponse.getItems()[0].getResponse().getId(), equalTo("1"));
assertThat(bulkResponse.getItems()[0].getResponse().getVersion(), equalTo(1L));
assertThat(bulkResponse.getItems()[0].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertThat(bulkResponse.getItems()[1].getResponse().getId(), equalTo("1"));
assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(2L));
assertThat(bulkResponse.getItems()[1].getResponse().getResult(), equalTo(DocWriteResponse.Result.UPDATED));
}

private void createStaleReplicaScenario(String master) throws Exception {
client().prepareIndex("test", "type1").setSource(jsonBuilder()
.startObject().field("field", "value1").endObject()).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -57,6 +58,7 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -397,6 +399,24 @@ public void onFailure(Exception e) {

assertBusy(() -> assertTrue(client().prepareGet("index", "type", "1").get().isExists()));

// index another document, this time using dynamic mappings.
// The ack timeout of 0 on dynamic mapping updates makes it possible for the document to be indexed on the primary, even
// if the dynamic mapping update is not applied on the replica yet.
ActionFuture<IndexResponse> dynamicMappingsFut = client().prepareIndex("index", "type", "2").setSource("field2", 42).execute();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume this just biffs up the test, or am I missing something specific?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if we sometime should not have an extra field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the first document that is put on the test does not use dynamic mappings (there is an explicit putMapping call for that one). This one does an actual dynamic mapping update (the thing we changed in this PR).
Not having an extra field is just what the first document tests :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a comment to the test here to clarify

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks. I thought that the first call was indeed dynamic (and also adds a type, which is why another call to only add a field is required). Thanks for clarifying.


// ...and wait for second mapping to be available on master
assertBusy(() -> {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, master);
final IndexService indexService = indicesService.indexServiceSafe(index);
assertNotNull(indexService);
final MapperService mapperService = indexService.mapperService();
DocumentMapper mapper = mapperService.documentMapper("type");
assertNotNull(mapper);
assertNotNull(mapper.mappers().getMapper("field2"));
});

assertBusy(() -> assertTrue(client().prepareGet("index", "type", "2").get().isExists()));

// The mappings have not been propagated to the replica yet as a consequence the document count not be indexed
// We wait on purpose to make sure that the document is not indexed because the shard operation is stalled
// and not just because it takes time to replicate the indexing request to the replica
Expand All @@ -415,6 +435,8 @@ public void onFailure(Exception e) {
assertEquals(Arrays.toString(docResp.getShardInfo().getFailures()),
2, docResp.getShardInfo().getTotal()); // both shards should have succeeded
});

assertThat(dynamicMappingsFut.get().getResult(), equalTo(CREATED));
}

}