From f1c722224af951cbeda17b6f9ef8cd4d47e16d46 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 8 Dec 2021 08:36:20 +0100 Subject: [PATCH] [Transform] handle pit index not found error (#81368) Do not fail the transform if pit search fails with index not found as a result of an index that got deleted via ILM, if that index is part of a search that selects indices using a wildcard, e.g. logs-*. If pit search fails, the search is retried using search without a pit context. The 2nd search might fail if the source targets an explicit index. In addition the usage of the pit API can not be disabled by transform. fixes #81252 relates #81256 --- .../transform/transforms/SettingsConfig.java | 51 +++++++- .../transforms/SettingsConfigTests.java | 9 ++ .../transforms/hlrc/SettingsConfigTests.java | 2 + .../xpack/core/transform/TransformField.java | 1 + .../transform/transforms/SettingsConfig.java | 79 +++++++++++-- .../transform/transforms/TransformConfig.java | 20 +++- .../transforms/SettingsConfigTests.java | 33 +++++- .../TransformConfigUpdateTests.java | 28 ++++- .../schema/transform_config.schema.json | 6 + .../transforms/ClientTransformIndexer.java | 35 ++++++ .../transforms/TransformIndexer.java | 23 +++- .../action/TransformUpdaterTests.java | 8 +- .../ClientTransformIndexerTests.java | 109 +++++++++++++++++- .../TransformIndexerFailureHandlingTests.java | 6 +- .../TransformIndexerStateTests.java | 2 +- 15 files changed, 372 insertions(+), 40 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/SettingsConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/SettingsConfig.java index 79adf4a132b87..02d6aa5bc6924 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/SettingsConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/SettingsConfig.java @@ -26,6 +26,7 @@ public class SettingsConfig implements ToXContentObject { private static final ParseField DOCS_PER_SECOND = new ParseField("docs_per_second"); private static final ParseField DATES_AS_EPOCH_MILLIS = new ParseField("dates_as_epoch_millis"); private static final ParseField ALIGN_CHECKPOINTS = new ParseField("align_checkpoints"); + private static final ParseField USE_PIT = new ParseField("use_point_in_time"); private static final int DEFAULT_MAX_PAGE_SEARCH_SIZE = -1; private static final float DEFAULT_DOCS_PER_SECOND = -1F; @@ -35,15 +36,19 @@ public class SettingsConfig implements ToXContentObject { // use an integer as we need to code 4 states: true, false, null (unchanged), default (defined server side) private static final int DEFAULT_ALIGN_CHECKPOINTS = -1; + // use an integer as we need to code 4 states: true, false, null (unchanged), default (defined server side) + private static final int DEFAULT_USE_PIT = -1; + private final Integer maxPageSearchSize; private final Float docsPerSecond; private final Integer datesAsEpochMillis; private final Integer alignCheckpoints; + private final Integer usePit; private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "settings_config", true, - args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3]) + args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3], (Integer) args[4]) ); static { @@ -63,17 +68,25 @@ public class SettingsConfig implements ToXContentObject { ALIGN_CHECKPOINTS, ValueType.BOOLEAN_OR_NULL ); + // this boolean requires 4 possible values: true, false, not_specified, default, therefore using a custom parser + PARSER.declareField( + optionalConstructorArg(), + p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? DEFAULT_USE_PIT : p.booleanValue() ? 1 : 0, + USE_PIT, + ValueType.BOOLEAN_OR_NULL + ); } public static SettingsConfig fromXContent(final XContentParser parser) { return PARSER.apply(parser, null); } - SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis, Integer alignCheckpoints) { + SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis, Integer alignCheckpoints, Integer usePit) { this.maxPageSearchSize = maxPageSearchSize; this.docsPerSecond = docsPerSecond; this.datesAsEpochMillis = datesAsEpochMillis; this.alignCheckpoints = alignCheckpoints; + this.usePit = usePit; } @Override @@ -107,6 +120,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(ALIGN_CHECKPOINTS.getPreferredName(), alignCheckpoints > 0 ? true : false); } } + if (usePit != null) { + if (usePit.equals(DEFAULT_USE_PIT)) { + builder.field(USE_PIT.getPreferredName(), (Boolean) null); + } else { + builder.field(USE_PIT.getPreferredName(), usePit > 0 ? true : false); + } + } builder.endObject(); return builder; } @@ -127,6 +147,10 @@ public Boolean getAlignCheckpoints() { return alignCheckpoints != null ? alignCheckpoints > 0 : null; } + public Boolean getUsePit() { + return usePit != null ? usePit > 0 : null; + } + @Override public boolean equals(Object other) { if (other == this) { @@ -140,12 +164,13 @@ public boolean equals(Object other) { return Objects.equals(maxPageSearchSize, that.maxPageSearchSize) && Objects.equals(docsPerSecond, that.docsPerSecond) && Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis) - && Objects.equals(alignCheckpoints, that.alignCheckpoints); + && Objects.equals(alignCheckpoints, that.alignCheckpoints) + && Objects.equals(usePit, that.usePit); } @Override public int hashCode() { - return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints); + return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit); } public static Builder builder() { @@ -157,6 +182,7 @@ public static class Builder { private Float docsPerSecond; private Integer datesAsEpochMillis; private Integer alignCheckpoints; + private Integer usePit; /** * Sets the paging maximum paging maxPageSearchSize that transform can use when @@ -215,8 +241,23 @@ public Builder setAlignCheckpoints(Boolean alignCheckpoints) { return this; } + /** + * Whether the point in time API should be used for search. + * Point in time is a more resource friendly way to query. It is used by default. In case of problems + * you can disable the point in time API usage with this setting. + * + * An explicit `null` resets to default. + * + * @param usePit true if the point in time API should be used. + * @return the {@link Builder} with usePit set. + */ + public Builder setUsePit(Boolean usePit) { + this.usePit = usePit == null ? DEFAULT_USE_PIT : usePit ? 1 : 0; + return this; + } + public SettingsConfig build() { - return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints); + return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/SettingsConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/SettingsConfigTests.java index ce7398aa7a645..ef3389ba9e5db 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/SettingsConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/SettingsConfigTests.java @@ -31,6 +31,7 @@ public static SettingsConfig randomSettingsConfig() { randomBoolean() ? null : randomIntBetween(10, 10_000), randomBoolean() ? null : randomFloat(), randomBoolean() ? null : randomIntBetween(-1, 1), + randomBoolean() ? null : randomIntBetween(-1, 1), randomBoolean() ? null : randomIntBetween(-1, 1) ); } @@ -74,6 +75,7 @@ public void testExplicitNullOnWriteParser() throws IOException { assertNull(settingsAsMap.getOrDefault("docs_per_second", "not_set")); assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set")); assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set")); + assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set")); config = fromString("{\"dates_as_epoch_millis\" : null}"); assertFalse(config.getDatesAsEpochMillis()); @@ -83,6 +85,7 @@ public void testExplicitNullOnWriteParser() throws IOException { assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set")); assertNull(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set")); assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set")); + assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set")); config = fromString("{\"align_checkpoints\" : null}"); assertFalse(config.getAlignCheckpoints()); @@ -92,6 +95,10 @@ public void testExplicitNullOnWriteParser() throws IOException { assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set")); assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set")); assertNull(settingsAsMap.getOrDefault("align_checkpoints", "not_set")); + assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set")); + + config = fromString("{\"use_point_in_time\" : null}"); + assertFalse(config.getUsePit()); } public void testExplicitNullOnWriteBuilder() throws IOException { @@ -104,6 +111,7 @@ public void testExplicitNullOnWriteBuilder() throws IOException { assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set")); assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set")); assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set")); + assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set")); SettingsConfig emptyConfig = new SettingsConfig.Builder().build(); assertNull(emptyConfig.getMaxPageSearchSize()); @@ -121,6 +129,7 @@ public void testExplicitNullOnWriteBuilder() throws IOException { assertNull(settingsAsMap.getOrDefault("docs_per_second", "not_set")); assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set")); assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set")); + assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set")); config = new SettingsConfig.Builder().setDatesAsEpochMillis(null).build(); // returns false, however it's `null` as in "use default", checked next diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/SettingsConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/SettingsConfigTests.java index 5d1591f6bda0b..bb1ab2fffba2f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/SettingsConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/SettingsConfigTests.java @@ -24,6 +24,7 @@ public static org.elasticsearch.xpack.core.transform.transforms.SettingsConfig r randomBoolean() ? null : randomIntBetween(10, 10_000), randomBoolean() ? null : randomFloat(), randomBoolean() ? null : randomIntBetween(0, 1), + randomBoolean() ? null : randomIntBetween(0, 1), randomBoolean() ? null : randomIntBetween(0, 1) ); } @@ -36,6 +37,7 @@ public static void assertHlrcEquals( assertEquals(serverTestInstance.getDocsPerSecond(), clientInstance.getDocsPerSecond()); assertEquals(serverTestInstance.getDatesAsEpochMillis(), clientInstance.getDatesAsEpochMillis()); assertEquals(serverTestInstance.getAlignCheckpoints(), clientInstance.getAlignCheckpoints()); + assertEquals(serverTestInstance.getUsePit(), clientInstance.getUsePit()); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java index 6f079a9da2bfd..9b745f60e9025 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java @@ -39,6 +39,7 @@ public final class TransformField { public static final ParseField DOCS_PER_SECOND = new ParseField("docs_per_second"); public static final ParseField DATES_AS_EPOCH_MILLIS = new ParseField("dates_as_epoch_millis"); public static final ParseField ALIGN_CHECKPOINTS = new ParseField("align_checkpoints"); + public static final ParseField USE_PIT = new ParseField("use_point_in_time"); public static final ParseField FIELD = new ParseField("field"); public static final ParseField SYNC = new ParseField("sync"); public static final ParseField TIME = new ParseField("time"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfig.java index 6c33776aa2b03..6b47aba9560b3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfig.java @@ -38,12 +38,13 @@ public class SettingsConfig implements Writeable, ToXContentObject { private static final float DEFAULT_DOCS_PER_SECOND = -1F; private static final int DEFAULT_DATES_AS_EPOCH_MILLIS = -1; private static final int DEFAULT_ALIGN_CHECKPOINTS = -1; + private static final int DEFAULT_USE_PIT = -1; private static ConstructingObjectParser createParser(boolean lenient) { ConstructingObjectParser parser = new ConstructingObjectParser<>( "transform_config_settings", lenient, - args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3]) + args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3], (Integer) args[4]) ); parser.declareIntOrNull(optionalConstructorArg(), DEFAULT_MAX_PAGE_SEARCH_SIZE, TransformField.MAX_PAGE_SEARCH_SIZE); parser.declareFloatOrNull(optionalConstructorArg(), DEFAULT_DOCS_PER_SECOND, TransformField.DOCS_PER_SECOND); @@ -61,6 +62,13 @@ private static ConstructingObjectParser createParser(boole TransformField.ALIGN_CHECKPOINTS, ValueType.BOOLEAN_OR_NULL ); + // this boolean requires 4 possible values: true, false, not_specified, default, therefore using a custom parser + parser.declareField( + optionalConstructorArg(), + p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? DEFAULT_USE_PIT : p.booleanValue() ? 1 : 0, + TransformField.USE_PIT, + ValueType.BOOLEAN_OR_NULL + ); return parser; } @@ -68,25 +76,40 @@ private static ConstructingObjectParser createParser(boole private final Float docsPerSecond; private final Integer datesAsEpochMillis; private final Integer alignCheckpoints; + private final Integer usePit; public SettingsConfig() { - this(null, null, (Integer) null, (Integer) null); + this(null, null, (Integer) null, (Integer) null, (Integer) null); } - public SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Boolean datesAsEpochMillis, Boolean alignCheckpoints) { + public SettingsConfig( + Integer maxPageSearchSize, + Float docsPerSecond, + Boolean datesAsEpochMillis, + Boolean alignCheckpoints, + Boolean usePit + ) { this( maxPageSearchSize, docsPerSecond, datesAsEpochMillis == null ? null : datesAsEpochMillis ? 1 : 0, - alignCheckpoints == null ? null : alignCheckpoints ? 1 : 0 + alignCheckpoints == null ? null : alignCheckpoints ? 1 : 0, + usePit == null ? null : usePit ? 1 : 0 ); } - public SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis, Integer alignCheckpoints) { + public SettingsConfig( + Integer maxPageSearchSize, + Float docsPerSecond, + Integer datesAsEpochMillis, + Integer alignCheckpoints, + Integer usePit + ) { this.maxPageSearchSize = maxPageSearchSize; this.docsPerSecond = docsPerSecond; this.datesAsEpochMillis = datesAsEpochMillis; this.alignCheckpoints = alignCheckpoints; + this.usePit = usePit; } public SettingsConfig(final StreamInput in) throws IOException { @@ -102,6 +125,11 @@ public SettingsConfig(final StreamInput in) throws IOException { } else { this.alignCheckpoints = DEFAULT_ALIGN_CHECKPOINTS; } + if (in.getVersion().onOrAfter(Version.V_8_1_0)) { // todo: V_7_16_1 + this.usePit = in.readOptionalInt(); + } else { + this.usePit = DEFAULT_USE_PIT; + } } public Integer getMaxPageSearchSize() { @@ -128,6 +156,14 @@ public Integer getAlignCheckpointsForUpdate() { return alignCheckpoints; } + public Boolean getUsePit() { + return usePit != null ? (usePit > 0) || (usePit == DEFAULT_USE_PIT) : null; + } + + public Integer getUsePitForUpdate() { + return usePit; + } + public ActionRequestValidationException validate(ActionRequestValidationException validationException) { if (maxPageSearchSize != null && (maxPageSearchSize < 10 || maxPageSearchSize > MultiBucketConsumerService.DEFAULT_MAX_BUCKETS)) { validationException = addValidationError( @@ -154,6 +190,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_15_0)) { out.writeOptionalInt(alignCheckpoints); } + if (out.getVersion().onOrAfter(Version.V_8_1_0)) { // todo: V_7_16_1 + out.writeOptionalInt(usePit); + } } @Override @@ -172,6 +211,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (alignCheckpoints != null && (alignCheckpoints.equals(DEFAULT_ALIGN_CHECKPOINTS) == false)) { builder.field(TransformField.ALIGN_CHECKPOINTS.getPreferredName(), alignCheckpoints > 0 ? true : false); } + if (usePit != null && (usePit.equals(DEFAULT_USE_PIT) == false)) { + builder.field(TransformField.USE_PIT.getPreferredName(), usePit > 0 ? true : false); + } builder.endObject(); return builder; } @@ -189,12 +231,13 @@ public boolean equals(Object other) { return Objects.equals(maxPageSearchSize, that.maxPageSearchSize) && Objects.equals(docsPerSecond, that.docsPerSecond) && Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis) - && Objects.equals(alignCheckpoints, that.alignCheckpoints); + && Objects.equals(alignCheckpoints, that.alignCheckpoints) + && Objects.equals(usePit, that.usePit); } @Override public int hashCode() { - return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints); + return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit); } @Override @@ -211,6 +254,7 @@ public static class Builder { private Float docsPerSecond; private Integer datesAsEpochMillis; private Integer alignCheckpoints; + private Integer usePit; /** * Default builder @@ -227,6 +271,7 @@ public Builder(SettingsConfig base) { this.docsPerSecond = base.docsPerSecond; this.datesAsEpochMillis = base.datesAsEpochMillis; this.alignCheckpoints = base.alignCheckpoints; + this.usePit = base.usePit; } /** @@ -286,6 +331,21 @@ public Builder setAlignCheckpoints(Boolean alignCheckpoints) { return this; } + /** + * Whether the point in time API should be used for search. + * Point in time is a more resource friendly way to query. It is used per default. In case of problems + * you can disable the point in time API usage with this setting. + * + * An explicit `null` resets to default. + * + * @param usePit true if the point in time API should be used. + * @return the {@link Builder} with usePit set. + */ + public Builder setUsePit(Boolean usePit) { + this.usePit = usePit == null ? DEFAULT_USE_PIT : usePit ? 1 : 0; + return this; + } + /** * Update settings according to given settings config. * @@ -313,12 +373,15 @@ public Builder update(SettingsConfig update) { ? null : update.getAlignCheckpointsForUpdate(); } + if (update.getUsePitForUpdate() != null) { + this.usePit = update.getUsePitForUpdate().equals(DEFAULT_USE_PIT) ? null : update.getUsePitForUpdate(); + } return this; } public SettingsConfig build() { - return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints); + return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit); } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java index 90e1e1389b9a8..fd42e8104dd82 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java @@ -52,7 +52,12 @@ */ public class TransformConfig extends AbstractDiffable implements Writeable, ToXContentObject { - public static final Version CONFIG_VERSION_LAST_CHANGED = Version.V_7_15_0; + /** + * Version of the last time the config defaults have been changed. + * Whenever defaults change, we must re-write the config on update in a way it + * does not change behavior. + */ + public static final Version CONFIG_VERSION_LAST_DEFAULTS_CHANGED = Version.V_7_15_0; public static final String NAME = "data_frame_transform_config"; public static final ParseField HEADERS = new ParseField("headers"); /** Version in which {@code FieldCapabilitiesRequest.runtime_fields} field was introduced. */ @@ -585,7 +590,7 @@ public static TransformConfig rewriteForUpdate(final TransformConfig transformCo // quick check if a rewrite is required, if none found just return the original // a failing quick check, does not mean a rewrite is necessary if (transformConfig.getVersion() != null - && transformConfig.getVersion().onOrAfter(CONFIG_VERSION_LAST_CHANGED) + && transformConfig.getVersion().onOrAfter(CONFIG_VERSION_LAST_DEFAULTS_CHANGED) && (transformConfig.getPivotConfig() == null || transformConfig.getPivotConfig().getMaxPageSearchSize() == null)) { return transformConfig; } @@ -616,7 +621,8 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) { maxPageSearchSize, builder.getSettings().getDocsPerSecond(), builder.getSettings().getDatesAsEpochMillis(), - builder.getSettings().getAlignCheckpoints() + builder.getSettings().getAlignCheckpoints(), + builder.getSettings().getUsePit() ) ); } @@ -628,19 +634,21 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) { builder.getSettings().getMaxPageSearchSize(), builder.getSettings().getDocsPerSecond(), true, - builder.getSettings().getAlignCheckpoints() + builder.getSettings().getAlignCheckpoints(), + builder.getSettings().getUsePit() ) ); } // 3. set align_checkpoints to false for transforms < 7.15 to keep BWC - if (builder.getVersion() != null && builder.getVersion().before(CONFIG_VERSION_LAST_CHANGED)) { + if (builder.getVersion() != null && builder.getVersion().before(Version.V_7_15_0)) { builder.setSettings( new SettingsConfig( builder.getSettings().getMaxPageSearchSize(), builder.getSettings().getDocsPerSecond(), builder.getSettings().getDatesAsEpochMillis(), - false + false, + builder.getSettings().getUsePit() ) ); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfigTests.java index 8103ef5ac7174..f7bc060750a82 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfigTests.java @@ -34,12 +34,19 @@ public static SettingsConfig randomSettingsConfig() { randomBoolean() ? null : randomIntBetween(10, 10_000), randomBoolean() ? null : randomFloat(), randomBoolean() ? null : randomIntBetween(0, 1), + randomBoolean() ? null : randomIntBetween(0, 1), randomBoolean() ? null : randomIntBetween(0, 1) ); } public static SettingsConfig randomNonEmptySettingsConfig() { - return new SettingsConfig(randomIntBetween(10, 10_000), randomFloat(), randomIntBetween(0, 1), randomIntBetween(0, 1)); + return new SettingsConfig( + randomIntBetween(10, 10_000), + randomFloat(), + randomIntBetween(0, 1), + randomIntBetween(0, 1), + randomIntBetween(0, 1) + ); } @Before @@ -82,6 +89,9 @@ public void testExplicitNullParsing() throws IOException { assertThat(fromString("{\"align_checkpoints\" : null}").getAlignCheckpointsForUpdate(), equalTo(-1)); assertNull(fromString("{}").getAlignCheckpointsForUpdate()); + + assertThat(fromString("{\"use_point_in_time\" : null}").getUsePitForUpdate(), equalTo(-1)); + assertNull(fromString("{}").getUsePitForUpdate()); } public void testUpdateUsingBuilder() throws IOException { @@ -89,7 +99,8 @@ public void testUpdateUsingBuilder() throws IOException { "{\"max_page_search_size\" : 10000, " + "\"docs_per_second\" :42, " + "\"dates_as_epoch_millis\": true, " - + "\"align_checkpoints\": false}" + + "\"align_checkpoints\": false," + + "\"use_point_in_time\": false}" ); SettingsConfig.Builder builder = new SettingsConfig.Builder(config); @@ -99,25 +110,29 @@ public void testUpdateUsingBuilder() throws IOException { assertThat(builder.build().getDocsPerSecond(), equalTo(42F)); assertThat(builder.build().getDatesAsEpochMillisForUpdate(), equalTo(1)); assertThat(builder.build().getAlignCheckpointsForUpdate(), equalTo(0)); + assertThat(builder.build().getUsePitForUpdate(), equalTo(0)); builder.update(fromString("{\"max_page_search_size\" : null}")); assertNull(builder.build().getMaxPageSearchSize()); assertThat(builder.build().getDocsPerSecond(), equalTo(42F)); assertThat(builder.build().getDatesAsEpochMillisForUpdate(), equalTo(1)); assertThat(builder.build().getAlignCheckpointsForUpdate(), equalTo(0)); + assertThat(builder.build().getUsePitForUpdate(), equalTo(0)); builder.update( fromString( "{\"max_page_search_size\" : 77, " + "\"docs_per_second\" :null, " + "\"dates_as_epoch_millis\": null, " - + "\"align_checkpoints\": null}" + + "\"align_checkpoints\": null," + + "\"use_point_in_time\": null}" ) ); assertThat(builder.build().getMaxPageSearchSize(), equalTo(77)); assertNull(builder.build().getDocsPerSecond()); assertNull(builder.build().getDatesAsEpochMillisForUpdate()); assertNull(builder.build().getAlignCheckpointsForUpdate()); + assertNull(builder.build().getUsePitForUpdate()); } public void testOmmitDefaultsOnWriteParser() throws IOException { @@ -151,6 +166,12 @@ public void testOmmitDefaultsOnWriteParser() throws IOException { settingsAsMap = xContentToMap(config); assertTrue(settingsAsMap.isEmpty()); + + config = fromString("{\"use_point_in_time\" : null}"); + assertThat(config.getUsePitForUpdate(), equalTo(-1)); + + settingsAsMap = xContentToMap(config); + assertTrue(settingsAsMap.isEmpty()); } public void testOmmitDefaultsOnWriteBuilder() throws IOException { @@ -184,6 +205,12 @@ public void testOmmitDefaultsOnWriteBuilder() throws IOException { settingsAsMap = xContentToMap(config); assertTrue(settingsAsMap.isEmpty()); + + config = new SettingsConfig.Builder().setUsePit(null).build(); + assertThat(config.getUsePitForUpdate(), equalTo(-1)); + + settingsAsMap = xContentToMap(config); + assertTrue(settingsAsMap.isEmpty()); } private Map xContentToMap(ToXContent xcontent) throws IOException { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java index 68b9848ffbddf..c7ec82ab727f4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java @@ -113,7 +113,7 @@ public void testApply() { TimeValue frequency = TimeValue.timeValueSeconds(10); SyncConfig syncConfig = new TimeSyncConfig("time_field", TimeValue.timeValueSeconds(30)); String newDescription = "new description"; - SettingsConfig settings = new SettingsConfig(4_000, 4_000.400F, true, true); + SettingsConfig settings = new SettingsConfig(4_000, 4_000.400F, true, true, true); Map newMetadata = randomMetadata(); RetentionPolicyConfig retentionPolicyConfig = new TimeRetentionPolicyConfig("time_field", new TimeValue(60_000)); update = new TransformConfigUpdate( @@ -168,7 +168,7 @@ public void testApplySettings() { null, null, null, - new SettingsConfig(4_000, null, (Boolean) null, null), + new SettingsConfig(4_000, null, (Boolean) null, null, null), null, null ); @@ -187,7 +187,7 @@ public void testApplySettings() { null, null, null, - new SettingsConfig(null, 43.244F, (Boolean) null, null), + new SettingsConfig(null, 43.244F, (Boolean) null, null, null), null, null ); @@ -198,14 +198,32 @@ public void testApplySettings() { assertThat(updatedConfig.getSettings().getAlignCheckpoints(), equalTo(config.getSettings().getAlignCheckpoints())); // now reset to default using the magic -1 - update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(-1, null, (Boolean) null, null), null, null); + update = new TransformConfigUpdate( + null, + null, + null, + null, + null, + new SettingsConfig(-1, null, (Boolean) null, null, null), + null, + null + ); updatedConfig = update.apply(updatedConfig); assertNull(updatedConfig.getSettings().getMaxPageSearchSize()); assertThat(updatedConfig.getSettings().getDocsPerSecond(), equalTo(43.244F)); assertThat(updatedConfig.getSettings().getDatesAsEpochMillis(), equalTo(config.getSettings().getDatesAsEpochMillis())); assertThat(updatedConfig.getSettings().getAlignCheckpoints(), equalTo(config.getSettings().getAlignCheckpoints())); - update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(-1, -1F, (Boolean) null, null), null, null); + update = new TransformConfigUpdate( + null, + null, + null, + null, + null, + new SettingsConfig(-1, -1F, (Boolean) null, null, null), + null, + null + ); updatedConfig = update.apply(updatedConfig); assertNull(updatedConfig.getSettings().getMaxPageSearchSize()); assertNull(updatedConfig.getSettings().getDocsPerSecond()); diff --git a/x-pack/plugin/core/src/test/resources/rest-api-spec/schema/transform_config.schema.json b/x-pack/plugin/core/src/test/resources/rest-api-spec/schema/transform_config.schema.json index 4a18e647b74c2..4f6accd936d1c 100644 --- a/x-pack/plugin/core/src/test/resources/rest-api-spec/schema/transform_config.schema.json +++ b/x-pack/plugin/core/src/test/resources/rest-api-spec/schema/transform_config.schema.json @@ -191,6 +191,12 @@ "title": "max page search size", "type": "integer", "default": 500 + }, + "use_point_in_time": { + "$id": "#root/settings/use_point_in_time", + "title": "use_point_in_time", + "type": "boolean", + "default": true } } }, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index f883d5638f72d..c5db5c79a0b48 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -31,6 +31,7 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -42,6 +43,7 @@ import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; @@ -111,6 +113,19 @@ class ClientTransformIndexer extends TransformIndexer { // TODO: move into context constructor context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint); + + if (transformConfig.getSettings().getUsePit() != null) { + disablePit = transformConfig.getSettings().getUsePit() == false; + } + } + + @Override + public void applyNewSettings(SettingsConfig newSettings) { + if (newSettings.getUsePit() != null) { + disablePit = newSettings.getUsePit() == false; + } + + super.applyNewSettings(newSettings); } @Override @@ -508,6 +523,26 @@ void doSearch(Tuple namedSearchRequest, ActionListener {}, e -> {})); @@ -171,7 +171,7 @@ public void testTransformUpdateNoAction() throws InterruptedException { ); assertConfiguration(listener -> transformConfigManager.getTransformConfiguration(minCompatibleConfig.getId(), listener), config -> { assertNotNull(config); - assertEquals(TransformConfig.CONFIG_VERSION_LAST_CHANGED, config.getVersion()); + assertEquals(TransformConfig.CONFIG_VERSION_LAST_DEFAULTS_CHANGED, config.getVersion()); }); } @@ -183,7 +183,7 @@ public void testTransformUpdateRewrite() throws InterruptedException { VersionUtils.randomVersionBetween( random(), Version.V_7_2_0, - VersionUtils.getPreviousVersion(TransformConfig.CONFIG_VERSION_LAST_CHANGED) + VersionUtils.getPreviousVersion(TransformConfig.CONFIG_VERSION_LAST_DEFAULTS_CHANGED) ) ); @@ -267,7 +267,7 @@ public void testTransformUpdateRewrite() throws InterruptedException { VersionUtils.randomVersionBetween( random(), Version.V_7_2_0, - VersionUtils.getPreviousVersion(TransformConfig.CONFIG_VERSION_LAST_CHANGED) + VersionUtils.getPreviousVersion(TransformConfig.CONFIG_VERSION_LAST_DEFAULTS_CHANGED) ) ); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index 604a997cca1d9..a4389d9191b4f 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -22,9 +22,11 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.Client; import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.PointInTimeBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.ShardSearchContextId; @@ -36,6 +38,7 @@ import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; +import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests; @@ -51,6 +54,7 @@ import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; import java.time.Instant; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -286,6 +290,92 @@ public void testPitInjectionIfPitNotSupported() throws InterruptedException { } } + public void testDisablePit() throws InterruptedException { + TransformConfig config = TransformConfigTests.randomTransformConfig(); + boolean pitEnabled = config.getSettings().getUsePit() == null || config.getSettings().getUsePit(); + + try (PitMockClient client = new PitMockClient(getTestName(), true)) { + MockClientTransformIndexer indexer = new MockClientTransformIndexer( + mock(ThreadPool.class), + new TransformServices( + mock(IndexBasedTransformConfigManager.class), + mock(TransformCheckpointService.class), + mock(TransformAuditor.class), + mock(SchedulerEngine.class) + ), + mock(CheckpointProvider.class), + new AtomicReference<>(IndexerState.STOPPED), + null, + client, + mock(TransformIndexerStats.class), + config, + null, + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 0L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 2L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME), + mock(TransformContext.class), + false + ); + + this.assertAsync(listener -> indexer.doNextSearch(0, listener), response -> { + if (pitEnabled) { + assertEquals("the_pit_id+", response.pointInTimeId()); + } else { + assertNull(response.pointInTimeId()); + } + }); + + // reverse the setting + indexer.applyNewSettings(new SettingsConfig.Builder().setUsePit(pitEnabled == false).build()); + + this.assertAsync(listener -> indexer.doNextSearch(0, listener), response -> { + if (pitEnabled) { + assertNull(response.pointInTimeId()); + } else { + assertEquals("the_pit_id+", response.pointInTimeId()); + } + }); + } + } + + public void testHandlePitIndexNotFound() throws InterruptedException { + // simulate a deleted index due to ILM + try (PitMockClient client = new PitMockClient(getTestName(), true)) { + ClientTransformIndexer indexer = createTestIndexer(client); + SearchRequest searchRequest = new SearchRequest("deleted-index"); + searchRequest.source().pointInTimeBuilder(new PointInTimeBuilder("the_pit_id")); + Tuple namedSearchRequest = new Tuple<>("test-handle-pit-index-not-found", searchRequest); + this.assertAsync(listener -> indexer.doSearch(namedSearchRequest, listener), response -> { + // if the pit got deleted, we know it retried + assertNull(response.pointInTimeId()); + }); + } + + // simulate a deleted index that is essential, search must fail (after a retry without pit) + try (PitMockClient client = new PitMockClient(getTestName(), true)) { + ClientTransformIndexer indexer = createTestIndexer(client); + SearchRequest searchRequest = new SearchRequest("essential-deleted-index"); + searchRequest.source().pointInTimeBuilder(new PointInTimeBuilder("the_pit_id")); + Tuple namedSearchRequest = new Tuple<>("test-handle-pit-index-not-found", searchRequest); + indexer.doSearch(namedSearchRequest, ActionListener.wrap(r -> fail("expected a failure, got response"), e -> { + assertTrue(e instanceof IndexNotFoundException); + assertEquals("no such index [essential-deleted-index]", e.getMessage()); + })); + } + } + private static class MockClientTransformIndexer extends ClientTransformIndexer { MockClientTransformIndexer( @@ -366,6 +456,17 @@ protected void } else if (request instanceof SearchRequest) { SearchRequest searchRequest = (SearchRequest) request; + // if pit is used and deleted-index is given throw index not found + if (searchRequest.pointInTimeBuilder() != null && Arrays.binarySearch(searchRequest.indices(), "deleted-index") >= 0) { + listener.onFailure(new IndexNotFoundException("deleted-index")); + return; + } + + if (Arrays.binarySearch(searchRequest.indices(), "essential-deleted-index") >= 0) { + listener.onFailure(new IndexNotFoundException("essential-deleted-index")); + return; + } + // throw search context missing for the 4th run if (searchRequest.pointInTimeBuilder() != null && "the_pit_id+++".equals(searchRequest.pointInTimeBuilder().getEncodedId())) { @@ -419,6 +520,10 @@ private void assertAsync(Consumer> function, Consumer f } private ClientTransformIndexer createTestIndexer() { + return createTestIndexer(null); + } + + private ClientTransformIndexer createTestIndexer(Client client) { ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class)); @@ -433,9 +538,9 @@ private ClientTransformIndexer createTestIndexer() { mock(CheckpointProvider.class), new AtomicReference<>(IndexerState.STOPPED), null, - mock(Client.class), + client == null ? mock(Client.class) : client, mock(TransformIndexerStats.class), - mock(TransformConfig.class), + TransformConfigTests.randomTransformConfig(), null, new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()), new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()), diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index 790bb72785d1e..663afa49c5292 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -309,7 +309,7 @@ public void testPageSizeAdapt() throws Exception { randomPivotConfig(), null, randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), - new SettingsConfig(pageSize, null, (Boolean) null, null), + new SettingsConfig(pageSize, null, (Boolean) null, null, null), null, null, null, @@ -384,7 +384,7 @@ public void testDoProcessAggNullCheck() { randomPivotConfig(), null, randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), - new SettingsConfig(pageSize, null, (Boolean) null, null), + new SettingsConfig(pageSize, null, (Boolean) null, null, null), null, null, null, @@ -448,7 +448,7 @@ public void testScriptError() throws Exception { randomPivotConfig(), null, randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), - new SettingsConfig(pageSize, null, (Boolean) null, null), + new SettingsConfig(pageSize, null, (Boolean) null, null, null), null, null, null, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java index 39a7b239d1cb9..44a7915e698ad 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -506,7 +506,7 @@ public void testStopAtCheckpointForThrottledTransform() throws Exception { randomPivotConfig(), null, randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), - new SettingsConfig(null, Float.valueOf(1.0f), (Boolean) null, (Boolean) null), + new SettingsConfig(null, Float.valueOf(1.0f), (Boolean) null, (Boolean) null, null), null, null, null,