Skip to content

Commit

Permalink
Revert "Search coordinator uses event.ingested in cluster state to do…
Browse files Browse the repository at this point in the history
… rewrites (elastic#110352)" (elastic#110881)

This reverts commit d45d164.
  • Loading branch information
quux00 authored Jul 15, 2024
1 parent 91fb6e9 commit 5c98098
Show file tree
Hide file tree
Showing 12 changed files with 173 additions and 1,034 deletions.
5 changes: 0 additions & 5 deletions docs/changelog/110352.yaml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testGetTimestampFieldTypeForTsdbDataStream() throws IOException {
DocWriteResponse indexResponse = indexDoc();

var indicesService = getInstanceFromNode(IndicesService.class);
var result = indicesService.getTimestampFieldTypeInfo(indexResponse.getShardId().getIndex());
var result = indicesService.getTimestampFieldType(indexResponse.getShardId().getIndex());
assertThat(result, notNullValue());
}

Expand All @@ -70,7 +70,7 @@ public void testGetTimestampFieldTypeForDataStream() throws IOException {
DocWriteResponse indexResponse = indexDoc();

var indicesService = getInstanceFromNode(IndicesService.class);
var result = indicesService.getTimestampFieldTypeInfo(indexResponse.getShardId().getIndex());
var result = indicesService.getTimestampFieldType(indexResponse.getShardId().getIndex());
assertThat(result, nullValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@
package org.elasticsearch.index.query;

import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.indices.DateFieldRangeInfo;
import org.elasticsearch.xcontent.XContentParserConfiguration;

import java.util.Collections;
Expand All @@ -26,24 +23,19 @@
* Context object used to rewrite {@link QueryBuilder} instances into simplified version in the coordinator.
* Instances of this object rely on information stored in the {@code IndexMetadata} for certain indices.
* Right now this context object is able to rewrite range queries that include a known timestamp field
* (i.e. the timestamp field for DataStreams or the 'event.ingested' field in ECS) into a MatchNoneQueryBuilder
* and skip the shards that don't hold queried data. See IndexMetadata for more details.
* (i.e. the timestamp field for DataStreams) into a MatchNoneQueryBuilder and skip the shards that
* don't hold queried data. See IndexMetadata#getTimestampRange() for more details
*/
public class CoordinatorRewriteContext extends QueryRewriteContext {
private final DateFieldRangeInfo dateFieldRangeInfo;
private final IndexLongFieldRange indexLongFieldRange;
private final DateFieldMapper.DateFieldType timestampFieldType;

/**
* Context for coordinator search rewrites based on time ranges for the @timestamp field and/or 'event.ingested' field
* @param parserConfig
* @param client
* @param nowInMillis
* @param dateFieldRangeInfo range and field type info for @timestamp and 'event.ingested'
*/
public CoordinatorRewriteContext(
XContentParserConfiguration parserConfig,
Client client,
LongSupplier nowInMillis,
DateFieldRangeInfo dateFieldRangeInfo
IndexLongFieldRange indexLongFieldRange,
DateFieldMapper.DateFieldType timestampFieldType
) {
super(
parserConfig,
Expand All @@ -61,98 +53,29 @@ public CoordinatorRewriteContext(
null,
null
);
this.dateFieldRangeInfo = dateFieldRangeInfo;
this.indexLongFieldRange = indexLongFieldRange;
this.timestampFieldType = timestampFieldType;
}

/**
* Get min timestamp for either '@timestamp' or 'event.ingested' fields. Any other field
* passed in will cause an {@link IllegalArgumentException} to be thrown, as these are the only
* two fields supported for coordinator rewrites (based on time range).
* @param fieldName Must be DataStream.TIMESTAMP_FIELD_NAME or IndexMetadata.EVENT_INGESTED_FIELD_NAME
* @return min timestamp for the field from IndexMetadata in cluster state.
*/
long getMinTimestamp(String fieldName) {
if (DataStream.TIMESTAMP_FIELD_NAME.equals(fieldName)) {
return dateFieldRangeInfo.getTimestampRange().getMin();
} else if (IndexMetadata.EVENT_INGESTED_FIELD_NAME.equals(fieldName)) {
return dateFieldRangeInfo.getEventIngestedRange().getMin();
} else {
throw new IllegalArgumentException(
Strings.format(
"Only [%s] or [%s] fields are supported for min timestamp coordinator rewrites, but got: [%s]",
DataStream.TIMESTAMP_FIELD_NAME,
IndexMetadata.EVENT_INGESTED_FIELD_NAME,
fieldName
)
);
}
long getMinTimestamp() {
return indexLongFieldRange.getMin();
}

/**
* Get max timestamp for either '@timestamp' or 'event.ingested' fields. Any other field
* passed in will cause an {@link IllegalArgumentException} to be thrown, as these are the only
* two fields supported for coordinator rewrites (based on time range).
* @param fieldName Must be DataStream.TIMESTAMP_FIELD_NAME or IndexMetadata.EVENT_INGESTED_FIELD_NAME
* @return max timestamp for the field from IndexMetadata in cluster state.
*/
long getMaxTimestamp(String fieldName) {
if (DataStream.TIMESTAMP_FIELD_NAME.equals(fieldName)) {
return dateFieldRangeInfo.getTimestampRange().getMax();
} else if (IndexMetadata.EVENT_INGESTED_FIELD_NAME.equals(fieldName)) {
return dateFieldRangeInfo.getEventIngestedRange().getMax();
} else {
throw new IllegalArgumentException(
Strings.format(
"Only [%s] or [%s] fields are supported for max timestamp coordinator rewrites, but got: [%s]",
DataStream.TIMESTAMP_FIELD_NAME,
IndexMetadata.EVENT_INGESTED_FIELD_NAME,
fieldName
)
);
}
long getMaxTimestamp() {
return indexLongFieldRange.getMax();
}

/**
* Determine whether either '@timestamp' or 'event.ingested' fields has useful timestamp ranges
* stored in cluster state for this context.
* Any other fieldname will cause an {@link IllegalArgumentException} to be thrown, as these are the only
* two fields supported for coordinator rewrites (based on time range).
* @param fieldName Must be DataStream.TIMESTAMP_FIELD_NAME or IndexMetadata.EVENT_INGESTED_FIELD_NAME
* @return min timestamp for the field from IndexMetadata in cluster state.
*/
boolean hasTimestampData(String fieldName) {
if (DataStream.TIMESTAMP_FIELD_NAME.equals(fieldName)) {
return dateFieldRangeInfo.getTimestampRange().isComplete()
&& dateFieldRangeInfo.getTimestampRange() != IndexLongFieldRange.EMPTY;
} else if (IndexMetadata.EVENT_INGESTED_FIELD_NAME.equals(fieldName)) {
return dateFieldRangeInfo.getEventIngestedRange().isComplete()
&& dateFieldRangeInfo.getEventIngestedRange() != IndexLongFieldRange.EMPTY;
} else {
throw new IllegalArgumentException(
Strings.format(
"Only [%s] or [%s] fields are supported for min/max timestamp coordinator rewrites, but got: [%s]",
DataStream.TIMESTAMP_FIELD_NAME,
IndexMetadata.EVENT_INGESTED_FIELD_NAME,
fieldName
)
);
}
boolean hasTimestampData() {
return indexLongFieldRange.isComplete() && indexLongFieldRange != IndexLongFieldRange.EMPTY;
}

/**
* @param fieldName Get MappedFieldType for either '@timestamp' or 'event.ingested' fields.
* @return min timestamp for the field from IndexMetadata in cluster state or null if fieldName was not
* DataStream.TIMESTAMP_FIELD_NAME or IndexMetadata.EVENT_INGESTED_FIELD_NAME.
*/
@Nullable
public MappedFieldType getFieldType(String fieldName) {
if (DataStream.TIMESTAMP_FIELD_NAME.equals(fieldName)) {
return dateFieldRangeInfo.getTimestampFieldType();
} else if (IndexMetadata.EVENT_INGESTED_FIELD_NAME.equals(fieldName)) {
return dateFieldRangeInfo.getEventIngestedFieldType();
} else {
if (fieldName.equals(timestampFieldType.name()) == false) {
return null;
}

return timestampFieldType;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.shard.IndexLongFieldRange;
import org.elasticsearch.indices.DateFieldRangeInfo;
import org.elasticsearch.xcontent.XContentParserConfiguration;

import java.util.function.Function;
Expand All @@ -26,14 +25,14 @@ public class CoordinatorRewriteContextProvider {
private final Client client;
private final LongSupplier nowInMillis;
private final Supplier<ClusterState> clusterStateSupplier;
private final Function<Index, DateFieldRangeInfo> mappingSupplier;
private final Function<Index, DateFieldMapper.DateFieldType> mappingSupplier;

public CoordinatorRewriteContextProvider(
XContentParserConfiguration parserConfig,
Client client,
LongSupplier nowInMillis,
Supplier<ClusterState> clusterStateSupplier,
Function<Index, DateFieldRangeInfo> mappingSupplier
Function<Index, DateFieldMapper.DateFieldType> mappingSupplier
) {
this.parserConfig = parserConfig;
this.client = client;
Expand All @@ -50,33 +49,18 @@ public CoordinatorRewriteContext getCoordinatorRewriteContext(Index index) {
if (indexMetadata == null) {
return null;
}

DateFieldRangeInfo dateFieldRangeInfo = mappingSupplier.apply(index);
if (dateFieldRangeInfo == null) {
DateFieldMapper.DateFieldType dateFieldType = mappingSupplier.apply(index);
if (dateFieldType == null) {
return null;
}

DateFieldMapper.DateFieldType timestampFieldType = dateFieldRangeInfo.getTimestampFieldType();
DateFieldMapper.DateFieldType eventIngestedFieldType = dateFieldRangeInfo.getEventIngestedFieldType();
IndexLongFieldRange timestampRange = indexMetadata.getTimestampRange();
IndexLongFieldRange eventIngestedRange = indexMetadata.getEventIngestedRange();

if (timestampRange.containsAllShardRanges() == false) {
// if @timestamp range is not present or not ready in cluster state, fallback to using time series range (if present)
timestampRange = indexMetadata.getTimeSeriesTimestampRange(timestampFieldType);
// if timestampRange in the time series is null AND the eventIngestedRange is not ready for use, return null (no coord rewrite)
if (timestampRange == null && eventIngestedRange.containsAllShardRanges() == false) {
timestampRange = indexMetadata.getTimeSeriesTimestampRange(dateFieldType);
if (timestampRange == null) {
return null;
}
}

// the DateFieldRangeInfo from the mappingSupplier only has field types, but not ranges
// so create a new object with ranges pulled from cluster state
return new CoordinatorRewriteContext(
parserConfig,
client,
nowInMillis,
new DateFieldRangeInfo(timestampFieldType, timestampRange, eventIngestedFieldType, eventIngestedRange)
);
return new CoordinatorRewriteContext(parserConfig, client, nowInMillis, timestampRange, dateFieldType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,11 @@ public String getWriteableName() {
protected MappedFieldType.Relation getRelation(final CoordinatorRewriteContext coordinatorRewriteContext) {
final MappedFieldType fieldType = coordinatorRewriteContext.getFieldType(fieldName);
if (fieldType instanceof final DateFieldMapper.DateFieldType dateFieldType) {
if (coordinatorRewriteContext.hasTimestampData(fieldName) == false) {
if (coordinatorRewriteContext.hasTimestampData() == false) {
return MappedFieldType.Relation.DISJOINT;
}
long minTimestamp = coordinatorRewriteContext.getMinTimestamp(fieldName);
long maxTimestamp = coordinatorRewriteContext.getMaxTimestamp(fieldName);
long minTimestamp = coordinatorRewriteContext.getMinTimestamp();
long maxTimestamp = coordinatorRewriteContext.getMaxTimestamp();
DateMathParser dateMathParser = getForceDateParser();
return dateFieldType.isFieldWithinQuery(
minTimestamp,
Expand Down

This file was deleted.

19 changes: 6 additions & 13 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperMetrics;
import org.elasticsearch.index.mapper.MapperRegistry;
Expand Down Expand Up @@ -1763,13 +1764,7 @@ public DataRewriteContext getDataRewriteContext(LongSupplier nowInMillis) {
}

public CoordinatorRewriteContextProvider getCoordinatorRewriteContextProvider(LongSupplier nowInMillis) {
return new CoordinatorRewriteContextProvider(
parserConfig,
client,
nowInMillis,
clusterService::state,
this::getTimestampFieldTypeInfo
);
return new CoordinatorRewriteContextProvider(parserConfig, client, nowInMillis, clusterService::state, this::getTimestampFieldType);
}

/**
Expand Down Expand Up @@ -1859,16 +1854,14 @@ public boolean allPendingDanglingIndicesWritten() {
}

/**
* @return DateFieldRangeInfo holding the field types of the {@code @timestamp} and {@code event.ingested} fields of the index.
* or {@code null} if:
* @return the field type of the {@code @timestamp} field of the given index, or {@code null} if:
* - the index is not found,
* - the field is not found, or
* - the mapping is not known yet, or
* - the index does not have a useful timestamp field.
* - the field is not a timestamp field.
*/
@Nullable
public DateFieldRangeInfo getTimestampFieldTypeInfo(Index index) {
return timestampFieldMapperService.getTimestampFieldTypeMap(index);
public DateFieldMapper.DateFieldType getTimestampFieldType(Index index) {
return timestampFieldMapperService.getTimestampFieldType(index);
}

public IndexScopedSettings getIndexScopedSettings() {
Expand Down
Loading

0 comments on commit 5c98098

Please sign in to comment.