From d4831c2551680ae929dd2dee7f8b343c02d9e2f7 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 29 Jan 2019 13:15:58 +0100 Subject: [PATCH] Close Index API should force a flush if a sync is needed (#37961) This commit changes the TransportVerifyShardBeforeCloseAction so that it issues a forced flush, forcing the translog and the Lucene commit to contain the same max seq number and global checkpoint in the case the Translog contains operations that were not written in the IndexWriter (like a Delete that touches a non existing doc). This way the assertion added in #37426 won't trip. Related to #33888 --- ...TransportVerifyShardBeforeCloseAction.java | 9 ++++-- .../index/engine/FrozenIndexTests.java | 30 +++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index f08f6ea7dffa2..a36a012f397ec 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -18,6 +18,8 @@ */ package org.elasticsearch.action.admin.indices.close; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.ActionFilters; @@ -50,6 +52,7 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> { public static final String NAME = CloseIndexAction.NAME + "[s]"; + protected Logger logger = LogManager.getLogger(getClass()); @Inject public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService, @@ -111,8 +114,10 @@ private void executeShardOperation(final ShardRequest request, final IndexShard throw new IllegalStateException("Global checkpoint [" + indexShard.getGlobalCheckpoint() + "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId); } - indexShard.flush(new FlushRequest()); - logger.debug("{} shard is ready for closing", shardId); + + final boolean forced = indexShard.isSyncNeeded(); + indexShard.flush(new FlushRequest().force(forced)); + logger.trace("{} shard is ready for closing [forced:{}]", shardId, forced); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java index c0493b6efd1fe..094c79efb50a9 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -8,6 +8,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.IndicesOptions; @@ -26,6 +27,7 @@ import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.AliasFilter; @@ -46,6 +48,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; public class FrozenIndexTests extends ESSingleNodeTestCase { @@ -340,4 +344,30 @@ public void testFreezeIndexIncreasesIndexSettingsVersion() throws ExecutionExcep assertThat(client().admin().cluster().prepareState().get().getState().metaData().index(index).getSettingsVersion(), equalTo(settingsVersion + 1)); } + + public void testFreezeEmptyIndexWithTranslogOps() throws Exception { + final String indexName = "empty"; + createIndex(indexName, Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.refresh_interval", TimeValue.MINUS_ONE) + .build()); + + final long nbNoOps = randomIntBetween(1, 10); + for (long i = 0; i < nbNoOps; i++) { + final DeleteResponse deleteResponse = client().prepareDelete(indexName, "_doc", Long.toString(i)).get(); + assertThat(deleteResponse.status(), is(RestStatus.NOT_FOUND)); + } + + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + assertBusy(() -> { + final Index index = client().admin().cluster().prepareState().get().getState().metaData().index(indexName).getIndex(); + final IndexService indexService = indicesService.indexService(index); + assertThat(indexService.hasShard(0), is(true)); + assertThat(indexService.getShard(0).getGlobalCheckpoint(), greaterThanOrEqualTo(nbNoOps - 1L)); + }); + + assertAcked(new XPackClient(client()).freeze(new TransportFreezeIndexAction.FreezeRequest(indexName))); + assertIndexFrozen(indexName); + } }