Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert fast refresh using search shards #115019

Merged
merged 3 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.injection.guice.Inject;
Expand Down Expand Up @@ -119,18 +120,27 @@ public void onPrimaryOperationComplete(
ActionListener<Void> listener
) {
assert replicaRequest.primaryRefreshResult.refreshed() : "primary has not refreshed";
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
indexShardRoutingTable,
replicaRequest.primaryRefreshResult.primaryTerm(),
replicaRequest.primaryRefreshResult.generation(),
false
);
transportService.sendRequest(
transportService.getLocalNode(),
TransportUnpromotableShardRefreshAction.NAME,
unpromotableReplicaRequest,
new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(
clusterService.state().metadata().index(indexShardRoutingTable.shardId().getIndex()).getSettings()
);

// Indices marked with fast refresh do not rely on refreshing the unpromotables
if (fastRefresh) {
listener.onResponse(null);
} else {
UnpromotableShardRefreshRequest unpromotableReplicaRequest = new UnpromotableShardRefreshRequest(
indexShardRoutingTable,
replicaRequest.primaryRefreshResult.primaryTerm(),
replicaRequest.primaryRefreshResult.generation(),
false
);
transportService.sendRequest(
transportService.getLocalNode(),
TransportUnpromotableShardRefreshAction.NAME,
unpromotableReplicaRequest,
new ActionListenerResponseHandler<>(listener.safeMap(r -> null), in -> ActionResponse.Empty.INSTANCE, refreshExecutor)
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@

import java.util.List;

import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;

public class TransportUnpromotableShardRefreshAction extends TransportBroadcastUnpromotableAction<
UnpromotableShardRefreshRequest,
ActionResponse.Empty> {
Expand Down Expand Up @@ -76,18 +73,6 @@ protected void unpromotableShardOperation(
return;
}

// During an upgrade to FAST_REFRESH_RCO, we expect search shards to be first upgraded before the primary is upgraded. Thus,
// when the primary is upgraded, and starts to deliver unpromotable refreshes, we expect the search shards to be upgraded already.
// Note that the fast refresh setting is final.
// TODO: remove assertion (ES-9563)
assert INDEX_FAST_REFRESH_SETTING.get(shard.indexSettings().getSettings()) == false
|| transportService.getLocalNodeConnection().getTransportVersion().onOrAfter(FAST_REFRESH_RCO)
: "attempted to refresh a fast refresh search shard "
+ shard
+ " on transport version "
+ transportService.getLocalNodeConnection().getTransportVersion()
+ " (before FAST_REFRESH_RCO)";

ActionListener.run(responseListener, listener -> {
shard.waitForPrimaryTermAndGeneration(
request.getPrimaryTerm(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,11 @@ protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionLi
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
assert indexShard.indexSettings().isFastRefresh() == false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea how serverless upgrade tests are working and we'll see what CI says, but I suppose this can trip if a request is wrongly redirected in a mixed cluster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, lets not risk it. I'll comment the assertion and add a TODO.

: "a search shard should not receive a TransportGetAction for an index with fast refresh";
handleGetOnUnpromotableShard(request, indexShard, listener);
return;
}
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
: "in Stateless a promotable to primary shard can receive a TransportGetAction only if an index has the fast refresh setting";
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,11 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
assert indexShard.indexSettings().isFastRefresh() == false
: "a search shard should not receive a TransportShardMultiGetAction for an index with fast refresh";
handleMultiGetOnUnpromotableShard(request, indexShard, listener);
return;
}
// TODO: adapt assertion to assert only that it is not stateless (ES-9563)
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false || indexShard.indexSettings().isFastRefresh()
: "in Stateless a promotable to primary shard can receive a TransportShardMultiGetAction only if an index has "
+ "the fast refresh setting";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -52,7 +53,9 @@ public void refreshShard(
case WAIT_UNTIL -> waitUntil(indexShard, location, new ActionListener<>() {
@Override
public void onResponse(Boolean forced) {
if (location != null && indexShard.routingEntry().isSearchable() == false) {
// Fast refresh indices do not depend on the unpromotables being refreshed
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings());
if (location != null && (indexShard.routingEntry().isSearchable() == false && fastRefresh == false)) {
refreshUnpromotables(indexShard, location, listener, forced, postWriteRefreshTimeout);
} else {
listener.onResponse(forced);
Expand All @@ -65,7 +68,9 @@ public void onFailure(Exception e) {
}
});
case IMMEDIATE -> immediate(indexShard, listener.delegateFailureAndWrap((l, r) -> {
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0) {
// Fast refresh indices do not depend on the unpromotables being refreshed
boolean fastRefresh = IndexSettings.INDEX_FAST_REFRESH_SETTING.get(indexShard.indexSettings().getSettings());
if (indexShard.getReplicationGroup().getRoutingTable().unpromotableShards().size() > 0 && fastRefresh == false) {
sendUnpromotableRequests(indexShard, r.generation(), true, l, postWriteRefreshTimeout);
} else {
l.onResponse(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;

public class OperationRouting {
Expand Down Expand Up @@ -306,14 +305,8 @@ public ShardId shardId(ClusterState clusterState, String index, String id, @Null
}

public static boolean canSearchShard(ShardRouting shardRouting, ClusterState clusterState) {
// TODO: remove if and always return isSearchable (ES-9563)
if (INDEX_FAST_REFRESH_SETTING.get(clusterState.metadata().index(shardRouting.index()).getSettings())) {
// Until all the cluster is upgraded, we send searches/gets to the primary (even if it has been upgraded) to execute locally.
if (clusterState.getMinTransportVersion().onOrAfter(FAST_REFRESH_RCO)) {
return shardRouting.isSearchable();
} else {
return shardRouting.isPromotableToPrimary();
}
return shardRouting.isPromotableToPrimary();
} else {
return shardRouting.isSearchable();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;

import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;

/**
* This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time.
* <p>
Expand Down Expand Up @@ -103,7 +105,10 @@ static boolean shouldLoadRandomAccessFiltersEagerly(IndexSettings settings) {
boolean loadFiltersEagerlySetting = settings.getValue(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING);
boolean isStateless = DiscoveryNode.isStateless(settings.getNodeSettings());
if (isStateless) {
return loadFiltersEagerlySetting && DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE);
return loadFiltersEagerlySetting
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not sure whether to revert this part to its original state, as it was wrong. This has the discussion. Basically the cache should be loaded whenever the shard can be searched. But welcome feedback.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

&& (DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.SEARCH_ROLE)
|| (DiscoveryNode.hasRole(settings.getNodeSettings(), DiscoveryNodeRole.INDEX_ROLE)
&& INDEX_FAST_REFRESH_SETTING.get(settings.getSettings())));
} else {
return loadFiltersEagerlySetting;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.cluster.routing;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -20,7 +19,6 @@

import java.util.List;

import static org.elasticsearch.TransportVersions.FAST_REFRESH_RCO;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand All @@ -29,22 +27,16 @@
public class IndexRoutingTableTests extends ESTestCase {

public void testReadyForSearch() {
innerReadyForSearch(false, false);
innerReadyForSearch(false, true);
innerReadyForSearch(true, false);
innerReadyForSearch(true, true);
innerReadyForSearch(false);
innerReadyForSearch(true);
}

// TODO: remove if (fastRefresh && beforeFastRefreshRCO) branches (ES-9563)
private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshRCO) {
private void innerReadyForSearch(boolean fastRefresh) {
Index index = new Index(randomIdentifier(), UUIDs.randomBase64UUID());
ClusterState clusterState = mock(ClusterState.class, Mockito.RETURNS_DEEP_STUBS);
when(clusterState.metadata().index(any(Index.class)).getSettings()).thenReturn(
Settings.builder().put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh).build()
);
when(clusterState.getMinTransportVersion()).thenReturn(
beforeFastRefreshRCO ? TransportVersion.fromId(FAST_REFRESH_RCO.id() - 1_00_0) : TransportVersion.current()
);
// 2 primaries that are search and index
ShardId p1 = new ShardId(index, 0);
IndexShardRoutingTable shardTable1 = new IndexShardRoutingTable(
Expand All @@ -63,7 +55,7 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR
shardTable1 = new IndexShardRoutingTable(p1, List.of(getShard(p1, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
shardTable2 = new IndexShardRoutingTable(p2, List.of(getShard(p2, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh && beforeFastRefreshRCO) {
if (fastRefresh) {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
} else {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
Expand Down Expand Up @@ -99,7 +91,7 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR
)
);
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh && beforeFastRefreshRCO) {
if (fastRefresh) {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
} else {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
Expand All @@ -126,6 +118,8 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR
assertTrue(indexRoutingTable.readyForSearch(clusterState));

// 2 unassigned primaries that are index only with some replicas that are all available
// Fast refresh indices do not support replicas so this can not practically happen. If we add support we will want to ensure
// that readyForSearch allows for searching replicas when the index shard is not available.
shardTable1 = new IndexShardRoutingTable(
p1,
List.of(
Expand All @@ -143,8 +137,8 @@ private void innerReadyForSearch(boolean fastRefresh, boolean beforeFastRefreshR
)
);
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh && beforeFastRefreshRCO) {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
if (fastRefresh) {
assertFalse(indexRoutingTable.readyForSearch(clusterState)); // if we support replicas for fast refreshes this needs to change
} else {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME;
import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
import static org.elasticsearch.index.cache.bitset.BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -272,21 +273,35 @@ public void testShouldLoadRandomAccessFiltersEagerly() {
for (var hasIndexRole : values) {
for (var loadFiltersEagerly : values) {
for (var isStateless : values) {
boolean result = BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly(
bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly)
);
if (isStateless) {
assertEquals(loadFiltersEagerly && hasIndexRole == false, result);
} else {
assertEquals(loadFiltersEagerly, result);
for (var fastRefresh : values) {
if (isStateless == false && fastRefresh) {
// fast refresh is only relevant for stateless indices
continue;
}

boolean result = BitsetFilterCache.shouldLoadRandomAccessFiltersEagerly(
bitsetFilterCacheSettings(isStateless, hasIndexRole, loadFiltersEagerly, fastRefresh)
);
if (isStateless) {
assertEquals(loadFiltersEagerly && ((hasIndexRole && fastRefresh) || hasIndexRole == false), result);
} else {
assertEquals(loadFiltersEagerly, result);
}
}
}
}
}
}

private IndexSettings bitsetFilterCacheSettings(boolean isStateless, boolean hasIndexRole, boolean loadFiltersEagerly) {
var indexSettingsBuilder = Settings.builder().put(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), loadFiltersEagerly);
private IndexSettings bitsetFilterCacheSettings(
boolean isStateless,
boolean hasIndexRole,
boolean loadFiltersEagerly,
boolean fastRefresh
) {
var indexSettingsBuilder = Settings.builder()
.put(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), loadFiltersEagerly)
.put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh);

var nodeSettingsBuilder = Settings.builder()
.putList(
Expand Down