Skip to content

Commit

Permalink
[Transform] handle pit index not found error (elastic#81368)
Browse files Browse the repository at this point in the history
Do not fail the transform if pit search fails with index not found as a result of an index that got deleted via ILM, 
if that index is part of a search that selects indices using a wildcard, e.g. logs-*. If pit search fails, the search 
is retried using search without a pit context. The 2nd search might fail if the source targets an explicit index. 
In addition the usage of the pit API can not be disabled by transform.

fixes elastic#81252
relates elastic#81256
  • Loading branch information
Hendrik Muhs committed Dec 8, 2021
1 parent 7d493f6 commit a923f43
Show file tree
Hide file tree
Showing 15 changed files with 372 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class SettingsConfig implements ToXContentObject {
private static final ParseField DOCS_PER_SECOND = new ParseField("docs_per_second");
private static final ParseField DATES_AS_EPOCH_MILLIS = new ParseField("dates_as_epoch_millis");
private static final ParseField ALIGN_CHECKPOINTS = new ParseField("align_checkpoints");
private static final ParseField USE_PIT = new ParseField("use_point_in_time");
private static final int DEFAULT_MAX_PAGE_SEARCH_SIZE = -1;
private static final float DEFAULT_DOCS_PER_SECOND = -1F;

Expand All @@ -35,15 +36,19 @@ public class SettingsConfig implements ToXContentObject {
// use an integer as we need to code 4 states: true, false, null (unchanged), default (defined server side)
private static final int DEFAULT_ALIGN_CHECKPOINTS = -1;

// use an integer as we need to code 4 states: true, false, null (unchanged), default (defined server side)
private static final int DEFAULT_USE_PIT = -1;

private final Integer maxPageSearchSize;
private final Float docsPerSecond;
private final Integer datesAsEpochMillis;
private final Integer alignCheckpoints;
private final Integer usePit;

private static final ConstructingObjectParser<SettingsConfig, Void> PARSER = new ConstructingObjectParser<>(
"settings_config",
true,
args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3])
args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3], (Integer) args[4])
);

static {
Expand All @@ -63,17 +68,25 @@ public class SettingsConfig implements ToXContentObject {
ALIGN_CHECKPOINTS,
ValueType.BOOLEAN_OR_NULL
);
// this boolean requires 4 possible values: true, false, not_specified, default, therefore using a custom parser
PARSER.declareField(
optionalConstructorArg(),
p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? DEFAULT_USE_PIT : p.booleanValue() ? 1 : 0,
USE_PIT,
ValueType.BOOLEAN_OR_NULL
);
}

public static SettingsConfig fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}

SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis, Integer alignCheckpoints) {
SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis, Integer alignCheckpoints, Integer usePit) {
this.maxPageSearchSize = maxPageSearchSize;
this.docsPerSecond = docsPerSecond;
this.datesAsEpochMillis = datesAsEpochMillis;
this.alignCheckpoints = alignCheckpoints;
this.usePit = usePit;
}

@Override
Expand Down Expand Up @@ -107,6 +120,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(ALIGN_CHECKPOINTS.getPreferredName(), alignCheckpoints > 0 ? true : false);
}
}
if (usePit != null) {
if (usePit.equals(DEFAULT_USE_PIT)) {
builder.field(USE_PIT.getPreferredName(), (Boolean) null);
} else {
builder.field(USE_PIT.getPreferredName(), usePit > 0 ? true : false);
}
}
builder.endObject();
return builder;
}
Expand All @@ -127,6 +147,10 @@ public Boolean getAlignCheckpoints() {
return alignCheckpoints != null ? alignCheckpoints > 0 : null;
}

public Boolean getUsePit() {
return usePit != null ? usePit > 0 : null;
}

@Override
public boolean equals(Object other) {
if (other == this) {
Expand All @@ -140,12 +164,13 @@ public boolean equals(Object other) {
return Objects.equals(maxPageSearchSize, that.maxPageSearchSize)
&& Objects.equals(docsPerSecond, that.docsPerSecond)
&& Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis)
&& Objects.equals(alignCheckpoints, that.alignCheckpoints);
&& Objects.equals(alignCheckpoints, that.alignCheckpoints)
&& Objects.equals(usePit, that.usePit);
}

@Override
public int hashCode() {
return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints);
return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit);
}

public static Builder builder() {
Expand All @@ -157,6 +182,7 @@ public static class Builder {
private Float docsPerSecond;
private Integer datesAsEpochMillis;
private Integer alignCheckpoints;
private Integer usePit;

/**
* Sets the paging maximum paging maxPageSearchSize that transform can use when
Expand Down Expand Up @@ -215,8 +241,23 @@ public Builder setAlignCheckpoints(Boolean alignCheckpoints) {
return this;
}

/**
* Whether the point in time API should be used for search.
* Point in time is a more resource friendly way to query. It is used by default. In case of problems
* you can disable the point in time API usage with this setting.
*
* An explicit `null` resets to default.
*
* @param usePit true if the point in time API should be used.
* @return the {@link Builder} with usePit set.
*/
public Builder setUsePit(Boolean usePit) {
this.usePit = usePit == null ? DEFAULT_USE_PIT : usePit ? 1 : 0;
return this;
}

public SettingsConfig build() {
return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints);
return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public static SettingsConfig randomSettingsConfig() {
randomBoolean() ? null : randomIntBetween(10, 10_000),
randomBoolean() ? null : randomFloat(),
randomBoolean() ? null : randomIntBetween(-1, 1),
randomBoolean() ? null : randomIntBetween(-1, 1),
randomBoolean() ? null : randomIntBetween(-1, 1)
);
}
Expand Down Expand Up @@ -74,6 +75,7 @@ public void testExplicitNullOnWriteParser() throws IOException {
assertNull(settingsAsMap.getOrDefault("docs_per_second", "not_set"));
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set"));

config = fromString("{\"dates_as_epoch_millis\" : null}");
assertFalse(config.getDatesAsEpochMillis());
Expand All @@ -83,6 +85,7 @@ public void testExplicitNullOnWriteParser() throws IOException {
assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set"));
assertNull(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"));
assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set"));

config = fromString("{\"align_checkpoints\" : null}");
assertFalse(config.getAlignCheckpoints());
Expand All @@ -92,6 +95,10 @@ public void testExplicitNullOnWriteParser() throws IOException {
assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertNull(settingsAsMap.getOrDefault("align_checkpoints", "not_set"));
assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set"));

config = fromString("{\"use_point_in_time\" : null}");
assertFalse(config.getUsePit());
}

public void testExplicitNullOnWriteBuilder() throws IOException {
Expand All @@ -104,6 +111,7 @@ public void testExplicitNullOnWriteBuilder() throws IOException {
assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set"));

SettingsConfig emptyConfig = new SettingsConfig.Builder().build();
assertNull(emptyConfig.getMaxPageSearchSize());
Expand All @@ -121,6 +129,7 @@ public void testExplicitNullOnWriteBuilder() throws IOException {
assertNull(settingsAsMap.getOrDefault("docs_per_second", "not_set"));
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set"));

config = new SettingsConfig.Builder().setDatesAsEpochMillis(null).build();
// returns false, however it's `null` as in "use default", checked next
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public static org.elasticsearch.xpack.core.transform.transforms.SettingsConfig r
randomBoolean() ? null : randomIntBetween(10, 10_000),
randomBoolean() ? null : randomFloat(),
randomBoolean() ? null : randomIntBetween(0, 1),
randomBoolean() ? null : randomIntBetween(0, 1),
randomBoolean() ? null : randomIntBetween(0, 1)
);
}
Expand All @@ -36,6 +37,7 @@ public static void assertHlrcEquals(
assertEquals(serverTestInstance.getDocsPerSecond(), clientInstance.getDocsPerSecond());
assertEquals(serverTestInstance.getDatesAsEpochMillis(), clientInstance.getDatesAsEpochMillis());
assertEquals(serverTestInstance.getAlignCheckpoints(), clientInstance.getAlignCheckpoints());
assertEquals(serverTestInstance.getUsePit(), clientInstance.getUsePit());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public final class TransformField {
public static final ParseField DOCS_PER_SECOND = new ParseField("docs_per_second");
public static final ParseField DATES_AS_EPOCH_MILLIS = new ParseField("dates_as_epoch_millis");
public static final ParseField ALIGN_CHECKPOINTS = new ParseField("align_checkpoints");
public static final ParseField USE_PIT = new ParseField("use_point_in_time");
public static final ParseField FIELD = new ParseField("field");
public static final ParseField SYNC = new ParseField("sync");
public static final ParseField TIME = new ParseField("time");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ public class SettingsConfig implements Writeable, ToXContentObject {
private static final float DEFAULT_DOCS_PER_SECOND = -1F;
private static final int DEFAULT_DATES_AS_EPOCH_MILLIS = -1;
private static final int DEFAULT_ALIGN_CHECKPOINTS = -1;
private static final int DEFAULT_USE_PIT = -1;

private static ConstructingObjectParser<SettingsConfig, Void> createParser(boolean lenient) {
ConstructingObjectParser<SettingsConfig, Void> parser = new ConstructingObjectParser<>(
"transform_config_settings",
lenient,
args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3])
args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3], (Integer) args[4])
);
parser.declareIntOrNull(optionalConstructorArg(), DEFAULT_MAX_PAGE_SEARCH_SIZE, TransformField.MAX_PAGE_SEARCH_SIZE);
parser.declareFloatOrNull(optionalConstructorArg(), DEFAULT_DOCS_PER_SECOND, TransformField.DOCS_PER_SECOND);
Expand All @@ -61,32 +62,54 @@ private static ConstructingObjectParser<SettingsConfig, Void> createParser(boole
TransformField.ALIGN_CHECKPOINTS,
ValueType.BOOLEAN_OR_NULL
);
// this boolean requires 4 possible values: true, false, not_specified, default, therefore using a custom parser
parser.declareField(
optionalConstructorArg(),
p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? DEFAULT_USE_PIT : p.booleanValue() ? 1 : 0,
TransformField.USE_PIT,
ValueType.BOOLEAN_OR_NULL
);
return parser;
}

private final Integer maxPageSearchSize;
private final Float docsPerSecond;
private final Integer datesAsEpochMillis;
private final Integer alignCheckpoints;
private final Integer usePit;

public SettingsConfig() {
this(null, null, (Integer) null, (Integer) null);
this(null, null, (Integer) null, (Integer) null, (Integer) null);
}

public SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Boolean datesAsEpochMillis, Boolean alignCheckpoints) {
public SettingsConfig(
Integer maxPageSearchSize,
Float docsPerSecond,
Boolean datesAsEpochMillis,
Boolean alignCheckpoints,
Boolean usePit
) {
this(
maxPageSearchSize,
docsPerSecond,
datesAsEpochMillis == null ? null : datesAsEpochMillis ? 1 : 0,
alignCheckpoints == null ? null : alignCheckpoints ? 1 : 0
alignCheckpoints == null ? null : alignCheckpoints ? 1 : 0,
usePit == null ? null : usePit ? 1 : 0
);
}

public SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis, Integer alignCheckpoints) {
public SettingsConfig(
Integer maxPageSearchSize,
Float docsPerSecond,
Integer datesAsEpochMillis,
Integer alignCheckpoints,
Integer usePit
) {
this.maxPageSearchSize = maxPageSearchSize;
this.docsPerSecond = docsPerSecond;
this.datesAsEpochMillis = datesAsEpochMillis;
this.alignCheckpoints = alignCheckpoints;
this.usePit = usePit;
}

public SettingsConfig(final StreamInput in) throws IOException {
Expand All @@ -102,6 +125,11 @@ public SettingsConfig(final StreamInput in) throws IOException {
} else {
this.alignCheckpoints = DEFAULT_ALIGN_CHECKPOINTS;
}
if (in.getVersion().onOrAfter(Version.V_8_1_0)) { // todo: V_7_16_1
this.usePit = in.readOptionalInt();
} else {
this.usePit = DEFAULT_USE_PIT;
}
}

public Integer getMaxPageSearchSize() {
Expand All @@ -128,6 +156,14 @@ public Integer getAlignCheckpointsForUpdate() {
return alignCheckpoints;
}

public Boolean getUsePit() {
return usePit != null ? (usePit > 0) || (usePit == DEFAULT_USE_PIT) : null;
}

public Integer getUsePitForUpdate() {
return usePit;
}

public ActionRequestValidationException validate(ActionRequestValidationException validationException) {
if (maxPageSearchSize != null && (maxPageSearchSize < 10 || maxPageSearchSize > MultiBucketConsumerService.DEFAULT_MAX_BUCKETS)) {
validationException = addValidationError(
Expand All @@ -154,6 +190,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_15_0)) {
out.writeOptionalInt(alignCheckpoints);
}
if (out.getVersion().onOrAfter(Version.V_8_1_0)) { // todo: V_7_16_1
out.writeOptionalInt(usePit);
}
}

@Override
Expand All @@ -172,6 +211,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (alignCheckpoints != null && (alignCheckpoints.equals(DEFAULT_ALIGN_CHECKPOINTS) == false)) {
builder.field(TransformField.ALIGN_CHECKPOINTS.getPreferredName(), alignCheckpoints > 0 ? true : false);
}
if (usePit != null && (usePit.equals(DEFAULT_USE_PIT) == false)) {
builder.field(TransformField.USE_PIT.getPreferredName(), usePit > 0 ? true : false);
}
builder.endObject();
return builder;
}
Expand All @@ -189,12 +231,13 @@ public boolean equals(Object other) {
return Objects.equals(maxPageSearchSize, that.maxPageSearchSize)
&& Objects.equals(docsPerSecond, that.docsPerSecond)
&& Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis)
&& Objects.equals(alignCheckpoints, that.alignCheckpoints);
&& Objects.equals(alignCheckpoints, that.alignCheckpoints)
&& Objects.equals(usePit, that.usePit);
}

@Override
public int hashCode() {
return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints);
return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit);
}

@Override
Expand All @@ -211,6 +254,7 @@ public static class Builder {
private Float docsPerSecond;
private Integer datesAsEpochMillis;
private Integer alignCheckpoints;
private Integer usePit;

/**
* Default builder
Expand All @@ -227,6 +271,7 @@ public Builder(SettingsConfig base) {
this.docsPerSecond = base.docsPerSecond;
this.datesAsEpochMillis = base.datesAsEpochMillis;
this.alignCheckpoints = base.alignCheckpoints;
this.usePit = base.usePit;
}

/**
Expand Down Expand Up @@ -286,6 +331,21 @@ public Builder setAlignCheckpoints(Boolean alignCheckpoints) {
return this;
}

/**
* Whether the point in time API should be used for search.
* Point in time is a more resource friendly way to query. It is used per default. In case of problems
* you can disable the point in time API usage with this setting.
*
* An explicit `null` resets to default.
*
* @param usePit true if the point in time API should be used.
* @return the {@link Builder} with usePit set.
*/
public Builder setUsePit(Boolean usePit) {
this.usePit = usePit == null ? DEFAULT_USE_PIT : usePit ? 1 : 0;
return this;
}

/**
* Update settings according to given settings config.
*
Expand Down Expand Up @@ -313,12 +373,15 @@ public Builder update(SettingsConfig update) {
? null
: update.getAlignCheckpointsForUpdate();
}
if (update.getUsePitForUpdate() != null) {
this.usePit = update.getUsePitForUpdate().equals(DEFAULT_USE_PIT) ? null : update.getUsePitForUpdate();
}

return this;
}

public SettingsConfig build() {
return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints);
return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit);
}
}
}
Loading

0 comments on commit a923f43

Please sign in to comment.