Skip to content

Commit

Permalink
Introduce lazy rollover for mapping updates in data streams (#103309)
Browse files Browse the repository at this point in the history
In this PR we implement the idea to introduce a flag, that a data stream needs to be rolloved over before the next document is indexed.
  • Loading branch information
gmarouli authored Jan 8, 2024
1 parent af916a0 commit 046cdea
Show file tree
Hide file tree
Showing 35 changed files with 868 additions and 168 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/103309.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 103309
summary: Introduce lazy rollover for mapping updates in data streams
area: Data streams
type: enhancement
issues:
- 89346
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,8 @@ stream's oldest backing index.
"hidden": false,
"system": false,
"allow_custom_routing": false,
"replicated": false
"replicated": false,
"rollover_on_write": false
}
]
}
Expand Down
1 change: 1 addition & 0 deletions docs/reference/data-streams/downsampling-ilm.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ following. Note the original `index_name`: `.ds-datastream-<timestamp>-000001`.
"system": false,
"allow_custom_routing": false,
"replicated": false,
"rollover_on_write": false,
"time_series": {
"temporal_ranges": [
{
Expand Down
1 change: 1 addition & 0 deletions docs/reference/data-streams/downsampling-manual.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ This returns:
"system": false,
"allow_custom_routing": false,
"replicated": false,
"rollover_on_write": false,
"time_series": {
"temporal_ranges": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ and that the next generation index will also be managed by {ilm-init}:
"hidden": false,
"system": false,
"allow_custom_routing": false,
"replicated": false
"replicated": false,
"rollover_on_write": false
}
]
}
Expand Down Expand Up @@ -275,7 +276,8 @@ GET _data_stream/dsl-data-stream
"hidden": false,
"system": false,
"allow_custom_routing": false,
"replicated": false
"replicated": false,
"rollover_on_write": false
}
]
}
Expand Down Expand Up @@ -352,7 +354,8 @@ GET _data_stream/dsl-data-stream
"hidden": false,
"system": false,
"allow_custom_routing": false,
"replicated": false
"replicated": false,
"rollover_on_write": false
}
]
}
Expand Down Expand Up @@ -449,7 +452,8 @@ GET _data_stream/dsl-data-stream
"hidden": false,
"system": false,
"allow_custom_routing": false,
"replicated": false
"replicated": false,
"rollover_on_write": false
}
]
}
Expand Down
14 changes: 13 additions & 1 deletion docs/reference/data-streams/use-a-data-stream.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,24 @@ GET /_data_stream/my-data-stream/_stats?human=true
=== Manually roll over a data stream

Use the <<indices-rollover-index,rollover API>> to manually
<<data-streams-rollover,roll over>> a data stream:
<<data-streams-rollover,roll over>> a data stream. You have
two options when manually rolling over:

1. To immediately trigger a rollover:
+
[source,console]
----
POST /my-data-stream/_rollover/
----
2. Or to postpone the rollover until the next indexing event occurs:
+
[source,console]
----
POST /my-data-stream/_rollover?lazy
----
+
Use the second to avoid having empty backing indices in data streams
that do not get updated often.

[discrete]
[[open-closed-backing-indices]]
Expand Down
11 changes: 9 additions & 2 deletions docs/reference/indices/get-data-stream.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ The conditions which will trigger the rollover of a backing index as configured
`cluster.lifecycle.default.rollover`. This property is an implementation detail and it will only be retrieved when the query
param `include_defaults` is set to `true`. The contents of this field are subject to change.
=====
`rollover_on_write`::
(Boolean)
If `true`, the next write to this data stream will trigger a rollover first and the document will be
indexed in the new backing index. If the rollover fails the indexing request will fail too.
====

[[get-data-stream-api-example]]
Expand Down Expand Up @@ -311,7 +316,8 @@ The API returns the following response:
"hidden": false,
"system": false,
"allow_custom_routing": false,
"replicated": false
"replicated": false,
"rollover_on_write": false
},
{
"name": "my-data-stream-two",
Expand Down Expand Up @@ -339,7 +345,8 @@ The API returns the following response:
"hidden": false,
"system": false,
"allow_custom_routing": false,
"replicated": false
"replicated": false,
"rollover_on_write": false
}
]
}
Expand Down
24 changes: 24 additions & 0 deletions docs/reference/indices/rollover-index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ include::{es-repo-dir}/indices/create-index.asciidoc[tag=index-name-reqs]
If `true`, checks whether the current index satisfies the specified
`conditions` but does not perform a rollover. Defaults to `false`.

`lazy`::
(Optional, Boolean)
If `true`, signals that the data stream will be rolled over when the next
indexing operation occurs. Applies only to data streams. Defaults to `false`.

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=wait_for_active_shards]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=timeoutparms]
Expand Down Expand Up @@ -204,6 +209,11 @@ conditions were specified, this is an empty object.
index met the condition.
====

`lazy`::
(Boolean)
If `true`, {es} did not perform the rollover, but successfully marked the data stream to be rolled
over at the next indexing event.

[[rollover-index-api-example]]
==== {api-examples-title}

Expand All @@ -218,6 +228,17 @@ POST my-data-stream/_rollover
----
// TEST[setup:my_data_stream]

The following request rolls over a data stream lazily, meaning that the data stream
will roll over at the next indexing event. This ensures that mapping and setting changes
will be applied to the coming data, but it will avoid creating extra backing indices for
data streams with slow ingestion.

[source,console]
----
POST my-data-stream/_rollover?lazy
----
// TEST[continued]

:target: data stream
:index: write index

Expand Down Expand Up @@ -257,6 +278,7 @@ The API returns:
"new_index": ".ds-my-data-stream-2099.05.07-000002",
"rolled_over": true,
"dry_run": false,
"lazy": false,
"conditions": {
"[max_age: 7d]": false,
"[max_docs: 1000]": true,
Expand Down Expand Up @@ -328,6 +350,7 @@ The API returns:
"new_index": "my-index-2099.05.07-000002",
"rolled_over": true,
"dry_run": false,
"lazy": false,
"conditions": {
"[max_age: 7d]": false,
"[max_docs: 1000]": true,
Expand Down Expand Up @@ -399,6 +422,7 @@ The API returns:
"new_index": "my-index-2099.05.07-000002",
"rolled_over": true,
"dry_run": false,
"lazy": false,
"conditions": {
"[max_age: 7d]": false,
"[max_docs: 1000]": true,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.datastreams;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;

import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;

public class LazyRolloverDataStreamIT extends DisabledSecurityDataStreamTestCase {

@SuppressWarnings("unchecked")
public void testLazyRollover() throws Exception {
Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/lazy-ds-template");
putComposableIndexTemplateRequest.setJsonEntity("""
{
"index_patterns": ["lazy-ds*"],
"data_stream": {}
}
""");
assertOK(client().performRequest(putComposableIndexTemplateRequest));

String dataStreamName = "lazy-ds";

Request createDocRequest = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
createDocRequest.setJsonEntity("{ \"@timestamp\": \"2020-10-22\", \"a\": 1 }");
assertOK(client().performRequest(createDocRequest));

final Response rolloverResponse = client().performRequest(new Request("POST", "/" + dataStreamName + "/_rollover?lazy"));
Map<String, Object> rolloverResponseMap = entityAsMap(rolloverResponse);
assertThat((String) rolloverResponseMap.get("old_index"), startsWith(".ds-lazy-ds-"));
assertThat((String) rolloverResponseMap.get("old_index"), endsWith("-000001"));
assertThat((String) rolloverResponseMap.get("new_index"), startsWith(".ds-lazy-ds-"));
assertThat((String) rolloverResponseMap.get("new_index"), endsWith("-000002"));
assertThat(rolloverResponseMap.get("lazy"), equalTo(true));
assertThat(rolloverResponseMap.get("dry_run"), equalTo(false));
assertThat(rolloverResponseMap.get("acknowledged"), equalTo(true));
assertThat(rolloverResponseMap.get("rolled_over"), equalTo(false));
assertThat(rolloverResponseMap.get("conditions"), equalTo(Map.of()));

{
final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + dataStreamName));
List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
assertThat(dataStreams.size(), is(1));
Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
assertThat(dataStream.get("name"), equalTo(dataStreamName));
assertThat(dataStream.get("rollover_on_write"), is(true));
assertThat(((List<Object>) dataStream.get("indices")).size(), is(1));
}

createDocRequest = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
createDocRequest.setJsonEntity("{ \"@timestamp\": \"2020-10-23\", \"a\": 2 }");
assertOK(client().performRequest(createDocRequest));

{
final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + dataStreamName));
List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
assertThat(dataStreams.size(), is(1));
Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
assertThat(dataStream.get("name"), equalTo(dataStreamName));
assertThat(dataStream.get("rollover_on_write"), is(false));
assertThat(((List<Object>) dataStream.get("indices")).size(), is(2));
}
}

@SuppressWarnings("unchecked")
public void testLazyRolloverFailsIndexing() throws Exception {
Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/lazy-ds-template");
putComposableIndexTemplateRequest.setJsonEntity("""
{
"index_patterns": ["lazy-ds*"],
"data_stream": {}
}
""");
assertOK(client().performRequest(putComposableIndexTemplateRequest));

String dataStreamName = "lazy-ds";

Request createDocRequest = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
createDocRequest.setJsonEntity("{ \"@timestamp\": \"2020-10-22\", \"a\": 1 }");
assertOK(client().performRequest(createDocRequest));

Request updateClusterSettingsRequest = new Request("PUT", "_cluster/settings");
updateClusterSettingsRequest.setJsonEntity("""
{
"persistent": {
"cluster.max_shards_per_node": 1
}
}""");
assertAcknowledged(client().performRequest(updateClusterSettingsRequest));

final Response rolloverResponse = client().performRequest(new Request("POST", "/" + dataStreamName + "/_rollover?lazy"));
Map<String, Object> rolloverResponseMap = entityAsMap(rolloverResponse);
assertThat((String) rolloverResponseMap.get("old_index"), startsWith(".ds-lazy-ds-"));
assertThat((String) rolloverResponseMap.get("old_index"), endsWith("-000001"));
assertThat((String) rolloverResponseMap.get("new_index"), startsWith(".ds-lazy-ds-"));
assertThat((String) rolloverResponseMap.get("new_index"), endsWith("-000002"));
assertThat(rolloverResponseMap.get("lazy"), equalTo(true));
assertThat(rolloverResponseMap.get("dry_run"), equalTo(false));
assertThat(rolloverResponseMap.get("acknowledged"), equalTo(true));
assertThat(rolloverResponseMap.get("rolled_over"), equalTo(false));
assertThat(rolloverResponseMap.get("conditions"), equalTo(Map.of()));

{
final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + dataStreamName));
List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
assertThat(dataStreams.size(), is(1));
Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
assertThat(dataStream.get("name"), equalTo(dataStreamName));
assertThat(dataStream.get("rollover_on_write"), is(true));
assertThat(((List<Object>) dataStream.get("indices")).size(), is(1));
}

try {
createDocRequest = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
createDocRequest.setJsonEntity("{ \"@timestamp\": \"2020-10-23\", \"a\": 2 }");
client().performRequest(createDocRequest);
fail("Indexing should have failed.");
} catch (ResponseException responseException) {
assertThat(responseException.getMessage(), containsString("this action would add [2] shards"));
}

updateClusterSettingsRequest = new Request("PUT", "_cluster/settings");
updateClusterSettingsRequest.setJsonEntity("""
{
"persistent": {
"cluster.max_shards_per_node": null
}
}""");
assertAcknowledged(client().performRequest(updateClusterSettingsRequest));
createDocRequest = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
createDocRequest.setJsonEntity("{ \"@timestamp\": \"2020-10-23\", \"a\": 2 }");
assertOK(client().performRequest(createDocRequest));
{
final Response dataStreamResponse = client().performRequest(new Request("GET", "/_data_stream/" + dataStreamName));
List<Object> dataStreams = (List<Object>) entityAsMap(dataStreamResponse).get("data_streams");
assertThat(dataStreams.size(), is(1));
Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
assertThat(dataStream.get("name"), equalTo(dataStreamName));
assertThat(dataStream.get("rollover_on_write"), is(false));
assertThat(((List<Object>) dataStream.get("indices")).size(), is(2));
}
}

@SuppressWarnings("unchecked")
public void testLazyRolloverWithConditions() throws Exception {
Request putComposableIndexTemplateRequest = new Request("POST", "/_index_template/lazy-ds-template");
putComposableIndexTemplateRequest.setJsonEntity("""
{
"index_patterns": ["lazy-ds*"],
"data_stream": {}
}
""");
assertOK(client().performRequest(putComposableIndexTemplateRequest));

String dataStreamName = "lazy-ds";

Request createDocRequest = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
createDocRequest.setJsonEntity("{ \"@timestamp\": \"2020-10-22\", \"a\": 1 }");

assertOK(client().performRequest(createDocRequest));

Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover?lazy");
rolloverRequest.setJsonEntity("{\"conditions\": {\"max_docs\": 1}}");
ResponseException responseError = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest));
assertThat(responseError.getResponse().getStatusLine().getStatusCode(), is(400));
assertThat(responseError.getMessage(), containsString("only without any conditions"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
*/
public abstract class BlockedSearcherRestCancellationTestCase extends HttpSmokeTestCase {

private static final Setting<Boolean> BLOCK_SEARCHER_SETTING = Setting.boolSetting(
protected static final Setting<Boolean> BLOCK_SEARCHER_SETTING = Setting.boolSetting(
"index.block_searcher",
false,
Setting.Property.IndexScope
Expand Down
Loading

0 comments on commit 046cdea

Please sign in to comment.