Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce lazy rollover for mapping updates in data streams #103309

Merged
merged 42 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
7b34c4e
Introduce lazy option in the rollover request
gmarouli Dec 8, 2023
6faa9c5
Persist rollover needed flag in data streams
gmarouli Dec 11, 2023
00d112b
Test lazy rollover applicability
gmarouli Dec 11, 2023
bb558aa
Add java rest test
gmarouli Dec 12, 2023
92e0165
Update docs/changelog/103309.yaml
gmarouli Dec 12, 2023
99af4f3
Reset rollover_needed when the ds rolls over
gmarouli Dec 12, 2023
807837f
Add ER to the changelog
gmarouli Dec 12, 2023
5daf72f
Small refactoring
gmarouli Dec 12, 2023
a2488cb
Add missing field from docs tests
gmarouli Dec 12, 2023
41f1cf3
Fix rollover cancellation test that was using concrete index
gmarouli Dec 13, 2023
449f821
Fix NPE when parsing bwc XContent
gmarouli Dec 13, 2023
c49a6a5
Merge branch 'main' into add-lazy-rollover-config
elasticmachine Dec 13, 2023
b145caa
Update docs/changelog/103309.yaml
gmarouli Dec 13, 2023
a09a074
Revert "Update docs/changelog/103309.yaml"
gmarouli Dec 13, 2023
7cd426e
Enhance the documentation
gmarouli Dec 13, 2023
ee2446a
Update docs/changelog/103309.yaml
gmarouli Dec 13, 2023
eb5d2ea
Update docs/reference/indices/rollover-index.asciidoc
gmarouli Dec 13, 2023
98142c3
Rename lazy rollover field on data stream
gmarouli Dec 13, 2023
d54b066
Throw an error when lazy is not applicable
gmarouli Dec 13, 2023
56685a7
Rename to rolloverOnWrite
gmarouli Dec 13, 2023
e773672
Fix docs test
gmarouli Dec 14, 2023
e04b8ff
Merge with main
gmarouli Dec 14, 2023
ce51961
Fix dry run & lazy rollover combo
gmarouli Dec 14, 2023
32687af
Extract parsed failure store fields to variables.
gmarouli Dec 14, 2023
4ebcbe3
Merge branch 'main' into add-lazy-rollover-config
elasticmachine Dec 19, 2023
bbfa007
Merge branch 'main' into add-lazy-rollover-config
elasticmachine Dec 20, 2023
b8afeb7
Merge with main
gmarouli Dec 21, 2023
f0916e5
Add missing return statement
gmarouli Dec 22, 2023
b29e26a
Captured the failure in a test
gmarouli Dec 22, 2023
9c6c6c1
Update rest-api-spec
gmarouli Dec 22, 2023
3a0cd13
Added extra test
gmarouli Dec 22, 2023
4380e31
Merge with main
gmarouli Dec 29, 2023
630fe2c
Merge branch 'main' into add-lazy-rollover-config
gmarouli Dec 29, 2023
211e765
Merge branch 'main' into add-lazy-rollover-config
elasticmachine Jan 2, 2024
244688f
Merge with main
gmarouli Jan 3, 2024
614c45d
Merge branch 'main' into add-lazy-rollover-config
gmarouli Jan 3, 2024
7ef5da7
Rollover a data stream at the next write request when lazy rollover a…
gmarouli Jan 4, 2024
afc73d3
Merge with main
gmarouli Jan 4, 2024
f5cfc3b
Merge branch 'main' into add-lazy-rollover-config
elasticmachine Jan 5, 2024
f26ab13
Rephrase docs
gmarouli Jan 8, 2024
29ec8c5
Update javadoc
gmarouli Jan 8, 2024
3b80435
Merge branch 'main' into add-lazy-rollover-config
gmarouli Jan 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/103309.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 103309
summary: Introduce lazy rollover for mapping updates in data streams
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,8 @@ stream's oldest backing index.
"hidden": false,
"system": false,
"allow_custom_routing": false,
"replicated": false
"replicated": false,
"rollover_on_write": false
}
]
}
Expand Down
1 change: 1 addition & 0 deletions docs/reference/data-streams/downsampling-ilm.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ following. Note the original `index_name`: `.ds-datastream-<timestamp>-000001`.
"system": false,
"allow_custom_routing": false,
"replicated": false,
"rollover_on_write": false,
"time_series": {
"temporal_ranges": [
{
Expand Down
1 change: 1 addition & 0 deletions docs/reference/data-streams/downsampling-manual.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ This returns:
"system": false,
"allow_custom_routing": false,
"replicated": false,
"rollover_on_write": false,
"time_series": {
"temporal_ranges": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ and that the next generation index will also be managed by {ilm-init}:
"hidden": false,
"system": false,
"allow_custom_routing": false,
"replicated": false
"replicated": false,
"rollover_on_write": false
}
]
}
Expand Down Expand Up @@ -275,7 +276,8 @@ GET _data_stream/dsl-data-stream
"hidden": false,
"system": false,
"allow_custom_routing": false,
"replicated": false
"replicated": false,
"rollover_on_write": false
}
]
}
Expand Down Expand Up @@ -352,7 +354,8 @@ GET _data_stream/dsl-data-stream
"hidden": false,
"system": false,
"allow_custom_routing": false,
"replicated": false
"replicated": false,
"rollover_on_write": false
}
]
}
Expand Down Expand Up @@ -449,7 +452,8 @@ GET _data_stream/dsl-data-stream
"hidden": false,
"system": false,
"allow_custom_routing": false,
"replicated": false
"replicated": false,
"rollover_on_write": false
}
]
}
Expand Down
6 changes: 4 additions & 2 deletions docs/reference/indices/get-data-stream.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ The API returns the following response:
"hidden": false,
"system": false,
"allow_custom_routing": false,
"replicated": false
"replicated": false,
"rollover_on_write": false
},
{
"name": "my-data-stream-two",
Expand Down Expand Up @@ -339,7 +340,8 @@ The API returns the following response:
"hidden": false,
"system": false,
"allow_custom_routing": false,
"replicated": false
"replicated": false,
"rollover_on_write": false
}
]
}
Expand Down
24 changes: 24 additions & 0 deletions docs/reference/indices/rollover-index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ include::{es-repo-dir}/indices/create-index.asciidoc[tag=index-name-reqs]
If `true`, checks whether the current index satisfies the specified
`conditions` but does not perform a rollover. Defaults to `false`.

`lazy`::
(Optional, Boolean)
If `true`, signals that the data stream, applies only to data streams, will be
rolled over when the next indexing operation occurs. Defaults to `false`.

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=wait_for_active_shards]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=timeoutparms]
Expand Down Expand Up @@ -204,6 +209,11 @@ conditions were specified, this is an empty object.
index met the condition.
====

`lazy`::
(Boolean)
If `true`, {es} did not perform the rollover, but successfully marked the data stream to be rolled
over at the next indexing event.

[[rollover-index-api-example]]
==== {api-examples-title}

Expand All @@ -218,6 +228,17 @@ POST my-data-stream/_rollover
----
// TEST[setup:my_data_stream]

The following request rolls over a data stream lazily, meaning that the data stream
will roll over at the next indexing event. This ensures that mapping and setting changes
will be applied to the coming data, but it will avoid creating extra backing indices for
data streams with slow ingestion.

[source,console]
----
POST my-data-stream/_rollover?lazy
----
// TEST[continued]

:target: data stream
:index: write index

Expand Down Expand Up @@ -257,6 +278,7 @@ The API returns:
"new_index": ".ds-my-data-stream-2099.05.07-000002",
"rolled_over": true,
"dry_run": false,
"lazy": false,
"conditions": {
"[max_age: 7d]": false,
"[max_docs: 1000]": true,
Expand Down Expand Up @@ -328,6 +350,7 @@ The API returns:
"new_index": "my-index-2099.05.07-000002",
"rolled_over": true,
"dry_run": false,
"lazy": false,
"conditions": {
"[max_age: 7d]": false,
"[max_docs: 1000]": true,
Expand Down Expand Up @@ -399,6 +422,7 @@ The API returns:
"new_index": "my-index-2099.05.07-000002",
"rolled_over": true,
"dry_run": false,
"lazy": false,
"conditions": {
"[max_age: 7d]": false,
"[max_docs: 1000]": true,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.datastreams;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;

import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;

public class LazyRolloverDataStreamIT extends DisabledSecurityDataStreamTestCase {

@SuppressWarnings("unchecked")
public void testLazyRollover() throws Exception {
Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/lazy-ds-template");
putComposableIndexTemplateRequest.setJsonEntity("""
{
"index_patterns": ["lazy-ds*"],
"data_stream": {}
}
""");
assertOK(client().performRequest(putComposableIndexTemplateRequest));

String dataStreamName = "lazy-ds";

Request createDocRequest = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
createDocRequest.setJsonEntity("{ \"@timestamp\": \"2020-10-22\", \"a\": 1 }");

assertOK(client().performRequest(createDocRequest));

final Response rolloverResponse = client().performRequest(new Request("POST", "/" + dataStreamName + "/_rollover?lazy"));
Map<String, Object> rolloverResponseMap = entityAsMap(rolloverResponse);
assertThat((String) rolloverResponseMap.get("old_index"), startsWith(".ds-lazy-ds-"));
assertThat((String) rolloverResponseMap.get("old_index"), endsWith("-000001"));
assertThat((String) rolloverResponseMap.get("new_index"), startsWith(".ds-lazy-ds-"));
assertThat((String) rolloverResponseMap.get("new_index"), endsWith("-000002"));
assertThat(rolloverResponseMap.get("lazy"), equalTo(true));
assertThat(rolloverResponseMap.get("dry_run"), equalTo(false));
assertThat(rolloverResponseMap.get("acknowledged"), equalTo(true));
assertThat(rolloverResponseMap.get("rolled_over"), equalTo(false));
assertThat(rolloverResponseMap.get("conditions"), equalTo(Map.of()));

{
final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + dataStreamName));
List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
assertThat(dataStreams.size(), is(1));
Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
assertThat(dataStream.get("name"), equalTo(dataStreamName));
assertThat(dataStream.get("rollover_on_write"), is(true));
assertThat(((List<Object>) dataStream.get("indices")).size(), is(1));
}

assertAcknowledged(client().performRequest(new Request("POST", "/" + dataStreamName + "/_rollover")));
{
final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + dataStreamName));
List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
assertThat(dataStreams.size(), is(1));
Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
assertThat(dataStream.get("name"), equalTo(dataStreamName));
assertThat(dataStream.get("rollover_on_write"), is(false));
assertThat(((List<Object>) dataStream.get("indices")).size(), is(2));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
*/
public abstract class BlockedSearcherRestCancellationTestCase extends HttpSmokeTestCase {

private static final Setting<Boolean> BLOCK_SEARCHER_SETTING = Setting.boolSetting(
protected static final Setting<Boolean> BLOCK_SEARCHER_SETTING = Setting.boolSetting(
"index.block_searcher",
false,
Setting.Property.IndexScope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,21 @@
package org.elasticsearch.http;

import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.settings.Settings;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

public class RolloverRestCancellationIT extends BlockedSearcherRestCancellationTestCase {

public void testRolloverRestCancellation() throws Exception {
runTest(new Request(HttpPost.METHOD_NAME, "test/_rollover"), RolloverAction.NAME);
assertAcked(
prepareCreate("test-000001").addAlias(new Alias("test-alias").writeIndex(true))
.setSettings(Settings.builder().put(BLOCK_SEARCHER_SETTING.getKey(), true))
);
ensureGreen("test-000001");
runTest(new Request(HttpPost.METHOD_NAME, "test-alias/_rollover"), RolloverAction.NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ static TransportVersion def(int id) {
public static final TransportVersion NODE_STATS_REQUEST_SIMPLIFIED = def(8_561_00_0);
public static final TransportVersion TEXT_EXPANSION_TOKEN_PRUNING_CONFIG_ADDED = def(8_562_00_0);
public static final TransportVersion ESQL_ASYNC_QUERY = def(8_563_00_0);
public static final TransportVersion LAZY_ROLLOVER_ADDED = def(8_564_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
import org.elasticsearch.cluster.metadata.MetadataIndexAliasesService;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
Expand Down Expand Up @@ -290,6 +291,7 @@ private RolloverResult rolloverDataStream(
createIndexClusterStateRequest.setMatchingTemplate(templateV2);
assert createIndexClusterStateRequest.performReroute() == false
: "rerouteCompletionIsNotRequired() assumes reroute is not called by underlying service";

ClusterState newState = createIndexService.applyCreateIndexRequest(
currentState,
createIndexClusterStateRequest,
Expand All @@ -312,6 +314,7 @@ private RolloverResult rolloverDataStream(
metadataBuilder = withShardSizeForecastForWriteIndex(dataStreamName, metadataBuilder);

newState = ClusterState.builder(newState).metadata(metadataBuilder).build();
newState = MetadataDataStreamsService.setRolloverOnWrite(newState, dataStreamName, false);

return new RolloverResult(newWriteIndexName, originalWriteIndex.getName(), newState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ public boolean hasMinConditions() {
return conditions.values().stream().anyMatch(c -> Condition.Type.MIN == c.type());
}

/**
* Returns true if there is at least one condition of any type
*/
public boolean hasConditions() {
return conditions.isEmpty() == false;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteableCollection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.elasticsearch.action.admin.indices.rollover;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
Expand Down Expand Up @@ -96,6 +97,7 @@ public class RolloverRequest extends AcknowledgedRequest<RolloverRequest> implem
private String rolloverTarget;
private String newIndexName;
private boolean dryRun;
private boolean lazy;
private RolloverConditions conditions = new RolloverConditions();
// the index name "_na_" is never read back, what matters are settings, mappings and aliases
private CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
Expand All @@ -107,6 +109,11 @@ public RolloverRequest(StreamInput in) throws IOException {
dryRun = in.readBoolean();
conditions = new RolloverConditions(in);
createIndexRequest = new CreateIndexRequest(in);
if (in.getTransportVersion().onOrAfter(TransportVersions.LAZY_ROLLOVER_ADDED)) {
lazy = in.readBoolean();
} else {
lazy = false;
}
}

RolloverRequest() {}
Expand Down Expand Up @@ -142,6 +149,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(dryRun);
conditions.writeTo(out);
createIndexRequest.writeTo(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.LAZY_ROLLOVER_ADDED)) {
out.writeBoolean(lazy);
}
}

@Override
Expand Down Expand Up @@ -194,6 +204,13 @@ public void setConditions(RolloverConditions conditions) {
this.conditions = conditions;
}

/**
* Sets if an unconditional rollover should wait for a document to come before it gets executed
*/
public void lazy(boolean lazy) {
this.lazy = lazy;
}

public boolean isDryRun() {
return dryRun;
}
Expand All @@ -214,6 +231,10 @@ public String getNewIndexName() {
return newIndexName;
}

public boolean isLazy() {
return lazy;
}

/**
* Given the results of evaluating each individual condition, determine whether the rollover request should proceed -- that is,
* whether the conditions are met.
Expand Down Expand Up @@ -257,6 +278,7 @@ public boolean equals(Object o) {
}
RolloverRequest that = (RolloverRequest) o;
return dryRun == that.dryRun
&& lazy == that.lazy
&& Objects.equals(rolloverTarget, that.rolloverTarget)
&& Objects.equals(newIndexName, that.newIndexName)
&& Objects.equals(conditions, that.conditions)
Expand All @@ -265,6 +287,6 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(rolloverTarget, newIndexName, dryRun, conditions, createIndexRequest);
return Objects.hash(rolloverTarget, newIndexName, dryRun, conditions, createIndexRequest, lazy);
}
}
Loading