Skip to content

Commit

Permalink
elastic#111893 Add Warnings For Missing Index Templates (elastic#114589)
Browse files Browse the repository at this point in the history
* Add data stream template validation

to snapshot restore

* Add data stream template validation

to data stream promotion endpoint

* Add new assertion for response headers

Add a new assertion to synchronously execute a request and check the
response contains a specific warning header

* Test for warning header on snapshot restore

When missing templates

* Test for promotion warnings

* Add documentation for the potential error states

* PR changes

* Spotless reformatting

* Add logic to look in snapshot global metadata

This checks if the snapshot contains a matching template for the DS

* Comment on test cleanup to explain it was copied

* Removed cluster service field
  • Loading branch information
lukewhiting authored and kderusso committed Oct 15, 2024
1 parent bbfc390 commit acc5f9a
Show file tree
Hide file tree
Showing 7 changed files with 407 additions and 2 deletions.
4 changes: 4 additions & 0 deletions docs/reference/data-streams/promote-data-stream-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ available, the data stream in the local cluster can be promoted
to a regular data stream, which allows these data streams to
be rolled over in the local cluster.

NOTE: When promoting a data stream, ensure the local cluster has a data stream enabled index template that matches the data stream.
If this is missing, the data stream will not be able to roll over until a matching index template is created.
This will affect the lifecycle management of the data stream and interfere with the data stream size and retention.

[source,console]
----
POST /_data_stream/_promote/my-data-stream
Expand Down
3 changes: 3 additions & 0 deletions docs/reference/snapshot-restore/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ Snapshots don't contain or back up:
* Node configuration files
* <<security-files,Security configuration files>>

NOTE: When restoring a data stream, if the target cluster does not have an index template that matches the data stream, the data stream will not be able to roll over until a matching index template is created.
This will affect the lifecycle management of the data stream and interfere with the data stream size and retention.

[discrete]
[[feature-state]]
=== Feature states
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
Expand Down Expand Up @@ -45,6 +46,7 @@
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotRestoreException;
Expand All @@ -62,7 +64,9 @@
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoWarningHeaderOnResponse;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertWarningHeaderOnResponse;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
Expand All @@ -80,6 +84,8 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
private static final Map<String, Integer> DOCUMENT_SOURCE = Collections.singletonMap("@timestamp", 123);
public static final String REPO = "repo";
public static final String SNAPSHOT = "snap";
public static final String TEMPLATE_1_ID = "t1";
public static final String TEMPLATE_2_ID = "t2";
private Client client;

private String dsBackingIndexName;
Expand All @@ -103,8 +109,8 @@ public void setup() throws Exception {
Path location = randomRepoPath();
createRepository(REPO, "fs", location);

DataStreamIT.putComposableIndexTemplate("t1", List.of("ds", "other-ds"));
DataStreamIT.putComposableIndexTemplate("t2", """
DataStreamIT.putComposableIndexTemplate(TEMPLATE_1_ID, List.of("ds", "other-ds"));
DataStreamIT.putComposableIndexTemplate(TEMPLATE_2_ID, """
{
"properties": {
"@timestamp": {
Expand Down Expand Up @@ -1335,4 +1341,149 @@ public void testRestoreDataStreamAliasWithConflictingIndicesAlias() throws Excep
);
assertThat(e.getMessage(), containsString("data stream alias and indices alias have the same name (my-alias)"));
}

public void testWarningHeaderOnRestoreWithoutTemplates() throws Exception {
String datastreamName = "ds";

CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices(datastreamName)
.setIncludeGlobalState(false)
.get();

RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
SnapshotId snapshotId = createSnapshotResponse.getSnapshotInfo().snapshotId();
assertEquals(RestStatus.OK, status);

assertEquals(Collections.singletonList(dsBackingIndexName), getSnapshot(REPO, SNAPSHOT).indices());

assertAcked(
client.execute(
DeleteDataStreamAction.INSTANCE,
new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, datastreamName, "other-ds")
)
);

assertAcked(
client.execute(
TransportDeleteComposableIndexTemplateAction.TYPE,
new TransportDeleteComposableIndexTemplateAction.Request(TEMPLATE_1_ID)
).get()
);

RestoreSnapshotRequestBuilder request = client.admin()
.cluster()
.prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices(datastreamName);

assertWarningHeaderOnResponse(
client,
request,
"Snapshot ["
+ snapshotId
+ "] contains data stream ["
+ datastreamName
+ "] but custer does not have a matching index "
+ "template. This will cause rollover to fail until a matching index template is created"
);

}

public void testWarningHeaderAbsentOnRestoreWithTemplates() throws Exception {
String datastreamName = "ds";

CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices(datastreamName)
.setIncludeGlobalState(false)
.get();

RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
SnapshotId snapshotId = createSnapshotResponse.getSnapshotInfo().snapshotId();
assertEquals(RestStatus.OK, status);

assertEquals(Collections.singletonList(dsBackingIndexName), getSnapshot(REPO, SNAPSHOT).indices());

assertAcked(
client.execute(
DeleteDataStreamAction.INSTANCE,
new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, datastreamName, "other-ds", "with-fs")
)
);

RestoreSnapshotRequestBuilder request = client.admin()
.cluster()
.prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices(datastreamName);

assertNoWarningHeaderOnResponse(
client,
request,
"but custer does not have a matching index template. This will cause rollover to fail until a matching index "
+ "template is created"
);

}

/**
* This test is muted as it's awaiting the same fix as {@link #testPartialRestoreSnapshotThatIncludesDataStreamWithGlobalState()}
*/
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/107515")
public void testWarningHeaderOnRestoreTemplateFromSnapshot() throws Exception {
String datastreamName = "ds";

CreateSnapshotResponse createSnapshotResponse = client.admin()
.cluster()
.prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIndices(datastreamName)
.setIncludeGlobalState(true)
.get();

RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
SnapshotId snapshotId = createSnapshotResponse.getSnapshotInfo().snapshotId();
assertEquals(RestStatus.OK, status);

assertEquals(Collections.singletonList(dsBackingIndexName), getSnapshot(REPO, SNAPSHOT).indices());

assertAcked(
client.execute(
DeleteDataStreamAction.INSTANCE,
new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, datastreamName, "other-ds")
)
);

assertAcked(
client.execute(
TransportDeleteComposableIndexTemplateAction.TYPE,
new TransportDeleteComposableIndexTemplateAction.Request(TEMPLATE_1_ID)
).get()
);

RestoreSnapshotRequestBuilder request = client.admin()
.cluster()
.prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setRestoreGlobalState(true)
.setIndices(datastreamName);

assertNoWarningHeaderOnResponse(
client,
request,
"Snapshot ["
+ snapshotId
+ "] contains data stream ["
+ datastreamName
+ "] but custer does not have a matching index "
+ "template. This will cause rollover to fail until a matching index template is created"
);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
*/
package org.elasticsearch.datastreams.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.datastreams.PromoteDataStreamAction;
Expand All @@ -23,6 +25,8 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.indices.SystemIndices;
Expand All @@ -31,8 +35,12 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import static org.elasticsearch.core.Strings.format;

public class PromoteDataStreamTransportAction extends AcknowledgedTransportMasterNodeAction<PromoteDataStreamAction.Request> {

private static final Logger logger = LogManager.getLogger(PromoteDataStreamTransportAction.class);

private final SystemIndices systemIndices;

@Inject
Expand Down Expand Up @@ -94,16 +102,41 @@ private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String

static ClusterState promoteDataStream(ClusterState currentState, PromoteDataStreamAction.Request request) {
DataStream dataStream = currentState.getMetadata().dataStreams().get(request.getName());

if (dataStream == null) {
throw new ResourceNotFoundException("data stream [" + request.getName() + "] does not exist");
}

warnIfTemplateMissingForDatastream(dataStream, currentState);

DataStream promotedDataStream = dataStream.promoteDataStream();
Metadata.Builder metadata = Metadata.builder(currentState.metadata());
metadata.put(promotedDataStream);
return ClusterState.builder(currentState).metadata(metadata).build();
}

private static void warnIfTemplateMissingForDatastream(DataStream dataStream, ClusterState currentState) {
var datastreamName = dataStream.getName();

var matchingIndex = currentState.metadata()
.templatesV2()
.values()
.stream()
.filter(cit -> cit.getDataStreamTemplate() != null)
.flatMap(cit -> cit.indexPatterns().stream())
.anyMatch(pattern -> Regex.simpleMatch(pattern, datastreamName));

if (matchingIndex == false) {
String warningMessage = format(
"Data stream [%s] does not have a matching index template. This will cause rollover to fail until a matching index "
+ "template is created",
datastreamName
);
logger.warn(() -> warningMessage);
HeaderWarning.addWarning(warningMessage);
}
}

@Override
protected ClusterBlockException checkBlock(PromoteDataStreamAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAlias;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -52,6 +53,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -398,6 +400,8 @@ private void startRestore(
Map<String, DataStream> dataStreamsToRestore = result.v1();
Map<String, DataStreamAlias> dataStreamAliasesToRestore = result.v2();

validateDataStreamTemplatesExistAndWarnIfMissing(dataStreamsToRestore, snapshotInfo, globalMetadata);

// Remove the data streams from the list of requested indices
requestIndices.removeAll(dataStreamsToRestore.keySet());

Expand Down Expand Up @@ -510,6 +514,35 @@ private void startRestore(
);
}

private void validateDataStreamTemplatesExistAndWarnIfMissing(
Map<String, DataStream> dataStreamsToRestore,
SnapshotInfo snapshotInfo,
Metadata globalMetadata
) {

Stream<ComposableIndexTemplate> streams = Stream.concat(
clusterService.state().metadata().templatesV2().values().stream(),
globalMetadata == null ? Stream.empty() : globalMetadata.templatesV2().values().stream()
);

Set<String> templatePatterns = streams.filter(cit -> cit.getDataStreamTemplate() != null)
.flatMap(cit -> cit.indexPatterns().stream())
.collect(Collectors.toSet());

for (String name : dataStreamsToRestore.keySet()) {
if (templatePatterns.stream().noneMatch(pattern -> Regex.simpleMatch(pattern, name))) {
String warningMessage = format(
"Snapshot [%s] contains data stream [%s] but custer does not have a matching index template. This will cause"
+ " rollover to fail until a matching index template is created",
snapshotInfo.snapshotId(),
name
);
logger.warn(() -> warningMessage);
HeaderWarning.addWarning(warningMessage);
}
}
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
Expand Down
Loading

0 comments on commit acc5f9a

Please sign in to comment.