Skip to content

Commit

Permalink
Fast refresh indices to use search shards
Browse files Browse the repository at this point in the history
The changes of PR elastic#115019 were reverted because it induced ES-8275. Now
that the ticket is done, this PR re-introduces the reverted changes.

Fast refresh indices should now behave like non fast refresh indices in
how they execute (m)gets and searches. I.e., they should use the search
shards.

For BWC, we define a new transport version. We expect search shards to
be upgraded first, before promotable shards. Until the cluster is fully
upgraded, the promotable shards (whether upgraded or not) will still
receive and execute gets/searches locally.

Relates ES-9573
  • Loading branch information
kingherc committed Nov 12, 2024
1 parent bcf1bd4 commit e935cf0
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ROLE_MONITOR_STATS = def(8_787_00_0);
public static final TransportVersion DATA_STREAM_INDEX_VERSION_DEPRECATION_CHECK = def(8_788_00_0);
public static final TransportVersion ADD_COMPATIBILITY_VERSIONS_TO_NODE_INFO = def(8_789_00_0);
public static final TransportVersion FAST_REFRESH_ADAPT = def(8_790_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.injection.guice.Inject;
Expand Down Expand Up @@ -120,27 +119,18 @@ public void onPrimaryOperationComplete(
ActionListener<Void> listener
) {
assert replicaRequest.primaryRefreshResult.refreshed() : "primary has not refreshed";
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(
clusterService.state().metadata().index(indexShardRoutingTable.shardId().getIndex()).getSettings()
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
indexShardRoutingTable,
replicaRequest.primaryRefreshResult.primaryTerm(),
replicaRequest.primaryRefreshResult.generation(),
false
);
transportService.sendRequest(
transportService.getLocalNode(),
TransportUnpromotableShardRefreshAction.NAME,
unpromotableReplicaRequest,
new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
);

// Indices marked with fast refresh do not rely on refreshing the unpromotables
if (fastRefresh) {
listener.onResponse(null);
} else {
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
indexShardRoutingTable,
replicaRequest.primaryRefreshResult.primaryTerm(),
replicaRequest.primaryRefreshResult.generation(),
false
);
transportService.sendRequest(
transportService.getLocalNode(),
TransportUnpromotableShardRefreshAction.NAME,
unpromotableReplicaRequest,
new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

import java.util.List;

import static org.elasticsearch.TransportVersions.FAST_REFRESH_ADAPT;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;

public class TransportUnpromotableShardRefreshAction extends TransportBroadcastUnpromotableAction<
UnpromotableShardRefreshRequest,
ActionResponse.Empty> {
Expand Down Expand Up @@ -73,6 +76,18 @@ protected void unpromotableShardOperation(
return;
}

// During an upgrade to FAST_REFRESH_ADAPT, we expect search shards to be first upgraded before the primary is upgraded. Thus,
// when the primary is upgraded, and starts to deliver unpromotable refreshes, we expect the search shards to be upgraded already.
// Note that the fast refresh setting is final.
// TODO: remove assertion (ES-9563)
assert INDEX_FAST_REFRESH_SETTING.get(shard.indexSettings().getSettings()) == false
|| transportService.getLocalNodeConnection().getTransportVersion().onOrAfter(FAST_REFRESH_ADAPT)
: "attempted to refresh a fast refresh search shard "
+ shard
+ " on transport version "
+ transportService.getLocalNodeConnection().getTransportVersion()
+ " (before FAST_REFRESH_ADAPT)";

ActionListener.run(responseListener, listener -> {
shard.waitForPrimaryTermAndGeneration(
request.getPrimaryTerm(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,10 @@ protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionLi
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
// TODO: Re-evaluate assertion (ES-8227)
// assert indexShard.indexSettings().isFastRefresh() == false
// : "a search shard should not receive a TransportGetAction for an index with fast refresh";
handleGetOnUnpromotableShard(request, indexShard, listener);
return;
}
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
: "in Stateless a promotable to primary shard can receive a TransportGetAction only if an index has the fast refresh setting";
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,10 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
// TODO: Re-evaluate assertion (ES-8227)
// assert indexShard.indexSettings().isFastRefresh() == false
// : "a search shard should not receive a TransportShardMultiGetAction for an index with fast refresh";
handleMultiGetOnUnpromotableShard(request, indexShard, listener);
return;
}
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
: "in Stateless a promotable to primary shard can receive a TransportShardMultiGetAction only if an index has "
+ "the fast refresh setting";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -53,9 +52,7 @@ public void refreshShard(
case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() {
@Override
public void onResponse(Boolean forced) {
// Fast refresh indices do not depend on the unpromotables being refreshed
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings());
if (location != null && (indexShard.routingEntry().isSearchable() == false && fastRefresh == false)) {
if (location != null && indexShard.routingEntry().isSearchable() == false) {
refreshUnpromotables(indexShard, location, listener, forced, postWriteRefreshTimeout);
} else {
listener.onResponse(forced);
Expand All @@ -68,9 +65,7 @@ public void onFailure(Exception e) {
}
});
case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> {
// Fast refresh indices do not depend on the unpromotables being refreshed
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings());
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0 && fastRefresh == false) {
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout);
} else {
l.onResponse(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.TransportVersions.FAST_REFRESH_ADAPT;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;

public class OperationRouting {
Expand Down Expand Up @@ -305,8 +306,14 @@ public ShardId shardId(ClusterState clusterState, String index, String id, @Null
}

public static boolean canSearchShard(ShardRouting shardRouting, ClusterState clusterState) {
// TODO: remove if and always return isSearchable (ES-9563)
if (INDEX_FAST_REFRESH_SETTING.get(clusterState.metadata().index(shardRouting.index()).getSettings())) {
return shardRouting.isPromotableToPrimary();
// Until all the cluster is upgraded, we send searches/gets to the primary (even if it has been upgraded) to execute locally.
if (clusterState.getMinTransportVersion().onOrAfter(FAST_REFRESH_ADAPT)) {
return shardRouting.isSearchable();
} else {
return shardRouting.isPromotableToPrimary();
}
} else {
return shardRouting.isSearchable();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;

import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;

/**
* This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time.
* <p>
Expand Down Expand Up @@ -105,10 +103,7 @@ static boolean shouldLoadRandomAccessFiltersEagerly(IndexSettings settings) {
boolean loadFiltersEagerlySetting = settings.getValue(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING);
boolean isStateless = DiscoveryNode.isStateless(settings.getNodeSettings());
if (isStateless) {
return loadFiltersEagerlySetting
&& (DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE)
|| (DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.INDEX_ROLE)
&& INDEX_FAST_REFRESH_SETTING.get(settings.getSettings())));
return loadFiltersEagerlySetting && DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE);
} else {
return loadFiltersEagerlySetting;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.cluster.routing;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -19,6 +20,7 @@

import java.util.List;

import static org.elasticsearch.TransportVersions.FAST_REFRESH_ADAPT;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand All @@ -27,16 +29,22 @@
public class IndexRoutingTableTests extends ESTestCase {

public void testReadyForSearch() {
innerReadyForSearch(false);
innerReadyForSearch(true);
innerReadyForSearch(false, false);
innerReadyForSearch(false, true);
innerReadyForSearch(true, false);
innerReadyForSearch(true, true);
}

private void innerReadyForSearch(boolean fastRefresh) {
// TODO: remove if (fastRefresh && beforeFastRefreshRCO) branches (ES-9563)
private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshRCO) {
Index index = new Index(randomIdentifier(), UUIDs.randomBase64UUID());
ClusterState clusterState = mock(ClusterState.class, Mockito.RETURNS_DEEP_STUBS);
when(clusterState.metadata().index(any(Index.class)).getSettings()).thenReturn(
Settings.builder().put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh).build()
);
when(clusterState.getMinTransportVersion()).thenReturn(
beforeFastRefreshRCO ? TransportVersion.fromId(FAST_REFRESH_ADAPT.id() - 1_00_0) : TransportVersion.current()
);
// 2 primaries that are search and index
ShardId p1 = new ShardId(index, 0);
IndexShardRoutingTable shardTable1 = new IndexShardRoutingTable(
Expand All @@ -55,7 +63,7 @@ private void innerReadyForSearch(boolean fastRefresh) {
shardTable1 = new IndexShardRoutingTable(p1, List.of(getShard(p1, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
shardTable2 = new IndexShardRoutingTable(p2, List.of(getShard(p2, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh) {
if (fastRefresh && beforeFastRefreshRCO) {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
} else {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
Expand Down Expand Up @@ -91,7 +99,7 @@ private void innerReadyForSearch(boolean fastRefresh) {
)
);
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh) {
if (fastRefresh && beforeFastRefreshRCO) {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
} else {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
Expand All @@ -118,8 +126,6 @@ private void innerReadyForSearch(boolean fastRefresh) {
assertTrue(indexRoutingTable.readyForSearch(clusterState));

// 2 unassigned primaries that are index only with some replicas that are all available
// Fast refresh indices do not support replicas so this can not practically happen. If we add support we will want to ensure
// that readyForSearch allows for searching replicas when the index shard is not available.
shardTable1 = new IndexShardRoutingTable(
p1,
List.of(
Expand All @@ -137,8 +143,8 @@ private void innerReadyForSearch(boolean fastRefresh) {
)
);
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh) {
assertFalse(indexRoutingTable.readyForSearch(clusterState)); // if we support replicas for fast refreshes this needs to change
if (fastRefresh && beforeFastRefreshRCO) {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
} else {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
import static org.elasticsearch.index.cache.bitset.BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -273,35 +272,21 @@ public void testShouldLoadRandomAccessFiltersEagerly() {
for (var hasIndexRole : values) {
for (var loadFiltersEagerly : values) {
for (var isStateless : values) {
for (var fastRefresh : values) {
if (isStateless == false && fastRefresh) {
// fast refresh is only relevant for stateless indices
continue;
}

boolean result = BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly(
bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly, fastRefresh)
);
if (isStateless) {
assertEquals(loadFiltersEagerly && ((hasIndexRole && fastRefresh) || hasIndexRole == false), result);
} else {
assertEquals(loadFiltersEagerly, result);
}
boolean result = BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly(
bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly)
);
if (isStateless) {
assertEquals(loadFiltersEagerly && hasIndexRole == false, result);
} else {
assertEquals(loadFiltersEagerly, result);
}
}
}
}
}

private IndexSettings bitsetFilterCacheSettings(
boolean isStateless,
boolean hasIndexRole,
boolean loadFiltersEagerly,
boolean fastRefresh
) {
var indexSettingsBuilder = Settings.builder()
.put(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), loadFiltersEagerly)
.put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh);
private IndexSettings bitsetFilterCacheSettings(boolean isStateless, boolean hasIndexRole, boolean loadFiltersEagerly) {
var indexSettingsBuilder = Settings.builder().put(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), loadFiltersEagerly);

var nodeSettingsBuilder = Settings.builder()
.putList(
Expand Down

0 comments on commit e935cf0

Please sign in to comment.