Skip to content

Commit

Permalink
[8.x] Don't skip shards in coord rewrite if timestamp is an alias (el…
Browse files Browse the repository at this point in the history
…astic#117271) (elastic#117912)

The coordinator rewrite has logic to skip indices if the provided date range
filter is not within the min and max range of all of its shards. This mechanism
is enabled for event.ingested and @timestamp fields, against searchable snapshots.

We have basic checks that such fields need to be of date field type, yet if they
are defined as alias of a date field, their range will be empty, which indicates
that the shards are empty, and the coord rewrite logic resolves the alias and
ends up skipping shards that may have matching docs.

This commit adds an explicit check that declares the range UNKNOWN instead of EMPTY
in these circumstances. The same check is also performed in the coord rewrite logic,
so that shards are no longer skipped by mistake.
  • Loading branch information
javanna authored Dec 3, 2024
1 parent e6ed956 commit 2205461
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 10 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/117271.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 117271
summary: Don't skip shards in coord rewrite if timestamp is an alias
area: Search
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ public String getWriteableName() {
protected MappedFieldType.Relation getRelation(final CoordinatorRewriteContext coordinatorRewriteContext) {
final MappedFieldType fieldType = coordinatorRewriteContext.getFieldType(fieldName);
if (fieldType instanceof final DateFieldMapper.DateFieldType dateFieldType) {
assert fieldName.equals(fieldType.name());
IndexLongFieldRange fieldRange = coordinatorRewriteContext.getFieldRange(fieldName);
if (fieldRange.isComplete() == false || fieldRange == IndexLongFieldRange.EMPTY) {
// if not all shards for this (frozen) index have reported ranges to cluster state, OR if they
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2258,8 +2258,8 @@ private ShardLongFieldRange determineShardLongFieldRange(String fieldName) {
return ShardLongFieldRange.UNKNOWN; // no mapper service, no idea if the field even exists
}
final MappedFieldType mappedFieldType = mapperService().fieldType(fieldName);
if (mappedFieldType instanceof DateFieldMapper.DateFieldType == false) {
return ShardLongFieldRange.UNKNOWN; // field missing or not a date
if (mappedFieldType instanceof DateFieldMapper.DateFieldType == false || mappedFieldType.name().equals(fieldName) == false) {
return ShardLongFieldRange.UNKNOWN; // field is missing, an alias (as the field type has a different name) or not a date field
}
if (mappedFieldType.isIndexed() == false) {
return ShardLongFieldRange.UNKNOWN; // range information missing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,13 @@ private static DateFieldRangeInfo fromMapperService(MapperService mapperService)
DateFieldMapper.DateFieldType eventIngestedFieldType = null;

MappedFieldType mappedFieldType = mapperService.fieldType(DataStream.TIMESTAMP_FIELD_NAME);
if (mappedFieldType instanceof DateFieldMapper.DateFieldType dateFieldType) {
if (mappedFieldType instanceof DateFieldMapper.DateFieldType dateFieldType
&& dateFieldType.name().equals(DataStream.TIMESTAMP_FIELD_NAME)) {
timestampFieldType = dateFieldType;
}
mappedFieldType = mapperService.fieldType(IndexMetadata.EVENT_INGESTED_FIELD_NAME);
if (mappedFieldType instanceof DateFieldMapper.DateFieldType dateFieldType) {
if (mappedFieldType instanceof DateFieldMapper.DateFieldType dateFieldType
&& dateFieldType.name().equals(IndexMetadata.EVENT_INGESTED_FIELD_NAME)) {
eventIngestedFieldType = dateFieldType;
}
if (timestampFieldType == null && eventIngestedFieldType == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.search.SearchShardsGroup;
import org.elasticsearch.action.search.SearchShardsRequest;
import org.elasticsearch.action.search.SearchShardsResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.TransportSearchShardsAction;
import org.elasticsearch.blobcache.shared.SharedBlobCacheService;
import org.elasticsearch.cluster.metadata.DataStream;
Expand Down Expand Up @@ -1100,6 +1101,119 @@ public void testCanMatchSkipsPartiallyMountedIndicesWhenFrozenNodesUnavailable()
}
}

public void testTimestampAsAlias() throws Exception {
doTestCoordRewriteWithAliasField("@timestamp");
}

public void testEventIngestedAsAlias() throws Exception {
doTestCoordRewriteWithAliasField("event.ingested");
}

private void doTestCoordRewriteWithAliasField(String aliasFieldName) throws Exception {
internalCluster().startMasterOnlyNode();
internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
final String dataNodeHoldingRegularIndex = internalCluster().startDataOnlyNode();
final String dataNodeHoldingSearchableSnapshot = internalCluster().startDataOnlyNode();

String timestampFieldName = randomAlphaOfLengthBetween(3, 10);
String[] indices = new String[] { "index-0001", "index-0002" };
for (String index : indices) {
Settings extraSettings = Settings.builder()
.put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeHoldingRegularIndex)
.build();

assertAcked(
indicesAdmin().prepareCreate(index)
.setMapping(
XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")

.startObject(timestampFieldName)
.field("type", "date")
.endObject()

.startObject(aliasFieldName)
.field("type", "alias")
.field("path", timestampFieldName)
.endObject()

.endObject()
.endObject()
)
.setSettings(indexSettingsNoReplicas(1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true).put(extraSettings))
);
}
ensureGreen(indices);

for (String index : indices) {
final List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
for (int i = 0; i < 10; i++) {
indexRequestBuilders.add(prepareIndex(index).setSource(timestampFieldName, "2024-11-19T08:08:08Z"));
}
indexRandom(true, false, indexRequestBuilders);

assertThat(
indicesAdmin().prepareForceMerge(index).setOnlyExpungeDeletes(true).setFlush(true).get().getFailedShards(),
equalTo(0)
);
refresh(index);
forceMerge();
}

final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createRepository(repositoryName, "mock");

final SnapshotId snapshotId = createSnapshot(repositoryName, "snapshot-1", List.of(indices[0])).snapshotId();
assertAcked(indicesAdmin().prepareDelete(indices[0]));

// Block the repository for the node holding the searchable snapshot shards
// to delay its restore
blockDataNode(repositoryName, dataNodeHoldingSearchableSnapshot);

// Force the searchable snapshot to be allocated in a particular node
Settings restoredIndexSettings = Settings.builder()
.put(INDEX_ROUTING_REQUIRE_GROUP_SETTING.getConcreteSettingForNamespace("_name").getKey(), dataNodeHoldingSearchableSnapshot)
.build();

String mountedIndex = indices[0] + "-mounted";
final MountSearchableSnapshotRequest mountRequest = new MountSearchableSnapshotRequest(
TEST_REQUEST_TIMEOUT,
mountedIndex,
repositoryName,
snapshotId.getName(),
indices[0],
restoredIndexSettings,
Strings.EMPTY_ARRAY,
false,
randomFrom(MountSearchableSnapshotRequest.Storage.values())
);
client().execute(MountSearchableSnapshotAction.INSTANCE, mountRequest).actionGet();

// Allow the searchable snapshots to be finally mounted
unblockNode(repositoryName, dataNodeHoldingSearchableSnapshot);
waitUntilRecoveryIsDone(mountedIndex);
ensureGreen(mountedIndex);

String[] fieldsToQuery = new String[] { timestampFieldName, aliasFieldName };
for (String fieldName : fieldsToQuery) {
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery(fieldName).from("2024-11-01T00:00:00.000000000Z", true);
SearchRequest request = new SearchRequest().searchType(SearchType.QUERY_THEN_FETCH)
.source(new SearchSourceBuilder().query(rangeQuery));
if (randomBoolean()) {
// pre_filter_shard_size default to 1 because there are read-only indices in the mix. It does not hurt to force it though.
request.setPreFilterShardSize(1);
}
assertResponse(client().search(request), searchResponse -> {
assertThat(searchResponse.getSuccessfulShards(), equalTo(2));
assertThat(searchResponse.getFailedShards(), equalTo(0));
assertThat(searchResponse.getSkippedShards(), equalTo(0));
assertThat(searchResponse.getTotalShards(), equalTo(2));
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(20L));
});
}
}

private void createIndexWithTimestampAndEventIngested(String indexName, int numShards, Settings extraSettings) throws IOException {
assertAcked(
indicesAdmin().prepareCreate(indexName)
Expand Down Expand Up @@ -1148,8 +1262,7 @@ private void createIndexWithOnlyOneTimestampField(String timestampField, String
ensureGreen(index);
}

private void indexDocumentsWithOnlyOneTimestampField(String timestampField, String index, int docCount, String timestampTemplate)
throws Exception {
private void indexDocumentsWithOnlyOneTimestampField(String timestampField, String index, int docCount, String timestampTemplate) {
final List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
for (int i = 0; i < docCount; i++) {
indexRequestBuilders.add(
Expand All @@ -1173,8 +1286,7 @@ private void indexDocumentsWithOnlyOneTimestampField(String timestampField, Stri
forceMerge();
}

private void indexDocumentsWithTimestampAndEventIngestedDates(String indexName, int docCount, String timestampTemplate)
throws Exception {
private void indexDocumentsWithTimestampAndEventIngestedDates(String indexName, int docCount, String timestampTemplate) {

final List<IndexRequestBuilder> indexRequestBuilders = new ArrayList<>();
for (int i = 0; i < docCount; i++) {
Expand Down Expand Up @@ -1211,7 +1323,7 @@ private void indexDocumentsWithTimestampAndEventIngestedDates(String indexName,
forceMerge();
}

private IndexMetadata getIndexMetadata(String indexName) {
private static IndexMetadata getIndexMetadata(String indexName) {
return clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT)
.clear()
.setMetadata(true)
Expand All @@ -1222,7 +1334,7 @@ private IndexMetadata getIndexMetadata(String indexName) {
.index(indexName);
}

private void waitUntilRecoveryIsDone(String index) throws Exception {
private static void waitUntilRecoveryIsDone(String index) throws Exception {
assertBusy(() -> {
RecoveryResponse recoveryResponse = indicesAdmin().prepareRecoveries(index).get();
assertThat(recoveryResponse.hasRecoveries(), equalTo(true));
Expand Down

0 comments on commit 2205461

Please sign in to comment.