Skip to content

Commit

Permalink
[apm-data] Apply lazy rollover on index template creation (elastic#11…
Browse files Browse the repository at this point in the history
…6219)

* Apply lazy rollover on index template creation

We should trigger a lazy rollover of existing data streams
regardless of whether the index template is being created
or updated. This ensures that the apm-data plugin will roll
over data streams that were previously using the Fleet
integration package.

* Update docs/changelog/116219.yaml

* Update docs/changelog/116219.yaml

* Add YAML REST test for template reinstallation

* Code review suggestion

elastic#116219 (comment)

* Remove wait_for_events from setup

This doesn't guarantee the templates are set up,
it only increases the chances; and we disable the
plugin at the start of the test anyway.
  • Loading branch information
axw committed Nov 5, 2024
1 parent 2532b8c commit 6eef6bf
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 51 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/116219.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 116219
summary: "[apm-data] Apply lazy rollover on index template creation"
area: Data streams
type: bug
issues:
- 116230
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,23 @@ setup:
- contains: {index_templates: {name: logs-apm.app@template}}
- contains: {index_templates: {name: logs-apm.error@template}}

---
"Test template reinstallation":
- skip:
reason: contains is a newly added assertion
features: contains
- do:
indices.delete_index_template:
name: traces-apm@template
- do:
cluster.health:
wait_for_events: languid
- do:
indices.get_index_template:
name: traces-apm@template
- length: {index_templates: 1}
- contains: {index_templates: {name: traces-apm@template}}

---
"Test traces-apm-* data stream indexing":
- skip:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
---
setup:
- do:
indices.put_index_template:
name: traces-low-prio
body:
data_stream: {}
index_patterns: ["traces-*"]
priority: 1

---
"Test data stream rollover on template installation":
- skip:
awaits_fix: "https://github.com/elastic/elasticsearch/issues/102360"

# Disable the apm-data plugin and delete the traces-apm@template index
# template so traces-low-prio takes effect.
- do:
cluster.put_settings:
body:
transient:
xpack.apm_data.registry.enabled: false
- do:
indices.delete_index_template:
name: traces-apm@template
- do:
indices.create_data_stream:
name: traces-apm-testing
- do:
indices.get_data_stream:
name: traces-apm-testing
- match: {data_streams.0.template: traces-low-prio}

# Re-enable the apm-data plugin, after which the traces-apm@template
# index template should be recreated and trigger a lazy rollover on
# the traces-apm-testing data stream.
- do:
cluster.put_settings:
body:
transient:
xpack.apm_data.registry.enabled: true
- do:
cluster.health:
wait_for_events: languid
- do:
indices.get_data_stream:
name: traces-apm-testing
- length: {data_streams: 1}
- match: {data_streams.0.template: traces-apm@template}
- match: {data_streams.0.rollover_on_write: true}

Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected Map<String, ComposableIndexTemplate> getComposableTemplateConfigs() {
}

@Override
protected boolean applyRolloverAfterTemplateV2Upgrade() {
protected boolean applyRolloverAfterTemplateV2Update() {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ private void addComposableTemplatesIfMissing(ClusterState state) {
}
} else if (Objects.isNull(currentTemplate)) {
logger.debug("adding composable template [{}] for [{}], because it doesn't exist", templateName, getOrigin());
putComposableTemplate(state, templateName, newTemplate.getValue(), creationCheck, false);
putComposableTemplate(state, templateName, newTemplate.getValue(), creationCheck);
} else if (Objects.isNull(currentTemplate.version()) || newTemplate.getValue().version() > currentTemplate.version()) {
// IndexTemplateConfig now enforces templates contain a `version` property, so if the template doesn't have one we can
// safely assume it's an old version of the template.
Expand All @@ -411,7 +411,7 @@ private void addComposableTemplatesIfMissing(ClusterState state) {
currentTemplate.version(),
newTemplate.getValue().version()
);
putComposableTemplate(state, templateName, newTemplate.getValue(), creationCheck, true);
putComposableTemplate(state, templateName, newTemplate.getValue(), creationCheck);
} else {
creationCheck.set(false);
logger.trace(
Expand All @@ -433,11 +433,11 @@ private void addComposableTemplatesIfMissing(ClusterState state) {

/**
* Returns true if the cluster state contains all of the component templates needed by the composable template. If this registry
* requires automatic rollover after index template upgrades (see {@link #applyRolloverAfterTemplateV2Upgrade()}), this method also
* requires automatic rollover after index template upgrades (see {@link #applyRolloverAfterTemplateV2Update()}), this method also
* verifies that the installed components templates are of the right version.
*/
private boolean componentTemplatesInstalled(ClusterState state, ComposableIndexTemplate indexTemplate) {
if (applyRolloverAfterTemplateV2Upgrade() == false) {
if (applyRolloverAfterTemplateV2Update() == false) {
// component templates and index templates can be updated independently, we only need to know that the required component
// templates are available
return state.metadata().componentTemplates().keySet().containsAll(indexTemplate.getRequiredComponentTemplates());
Expand Down Expand Up @@ -533,8 +533,7 @@ private void putComposableTemplate(
ClusterState state,
final String templateName,
final ComposableIndexTemplate indexTemplate,
final AtomicBoolean creationCheck,
final boolean isUpgrade
final AtomicBoolean creationCheck
) {
final Executor executor = threadPool.generic();
executor.execute(() -> {
Expand All @@ -549,8 +548,8 @@ private void putComposableTemplate(
@Override
public void onResponse(AcknowledgedResponse response) {
if (response.isAcknowledged()) {
if (isUpgrade && applyRolloverAfterTemplateV2Upgrade()) {
invokeRollover(state, templateName, indexTemplate, creationCheck);
if (applyRolloverAfterTemplateV2Update()) {
invokeRollover(state, templateName, indexTemplate, () -> creationCheck.set((false)));
} else {
creationCheck.set(false);
}
Expand Down Expand Up @@ -763,12 +762,13 @@ public void onFailure(Exception e) {

/**
* Allows registries to opt-in for automatic rollover of "relevant" data streams immediately after a composable index template gets
* upgraded. If set to {@code true}, then every time a composable index template is being upgraded, all data streams of which name
* matches this template's index patterns AND of all matching templates the upgraded one has the highest priority, will be rolled over.
* updated, including its initial installation. If set to {@code true}, then every time a composable index template is being updated,
* all data streams of which name matches this template's index patterns AND of all matching templates the upgraded one has the highest
* priority, will be rolled over.
*
* @return {@code true} if this registry wants to apply automatic rollovers after template V2 upgrades
*/
protected boolean applyRolloverAfterTemplateV2Upgrade() {
protected boolean applyRolloverAfterTemplateV2Update() {
return false;
}

Expand All @@ -782,50 +782,56 @@ protected void onPutPipelineFailure(String pipelineId, Exception e) {
logger.error(() -> format("error adding ingest pipeline template [%s] for [%s]", pipelineId, getOrigin()), e);
}

/**
* invokeRollover rolls over any data streams matching the index template,
* and then invokes runAfter.
*/
private void invokeRollover(
final ClusterState state,
final String templateName,
final ComposableIndexTemplate indexTemplate,
final AtomicBoolean creationCheck
final Runnable runAfter
) {
final Executor executor = threadPool.generic();
executor.execute(() -> {
List<String> rolloverTargets = findRolloverTargetDataStreams(state, templateName, indexTemplate);
if (rolloverTargets.isEmpty() == false) {
GroupedActionListener<RolloverResponse> groupedActionListener = new GroupedActionListener<>(
rolloverTargets.size(),
new ActionListener<>() {
@Override
public void onResponse(Collection<RolloverResponse> rolloverResponses) {
creationCheck.set(false);
onRolloversBulkResponse(rolloverResponses);
}
if (rolloverTargets.isEmpty()) {
runAfter.run();
return;
}
GroupedActionListener<RolloverResponse> groupedActionListener = new GroupedActionListener<>(
rolloverTargets.size(),
new ActionListener<>() {
@Override
public void onResponse(Collection<RolloverResponse> rolloverResponses) {
runAfter.run();
onRolloversBulkResponse(rolloverResponses);
}

@Override
public void onFailure(Exception e) {
creationCheck.set(false);
onRolloverFailure(e);
}
@Override
public void onFailure(Exception e) {
runAfter.run();
onRolloverFailure(e);
}
);
for (String rolloverTarget : rolloverTargets) {
logger.info(
"rolling over data stream [{}] lazily as a followup to the upgrade of the [{}] index template [{}]",
rolloverTarget,
getOrigin(),
templateName
);
RolloverRequest request = new RolloverRequest(rolloverTarget, null);
request.lazy(true);
request.masterNodeTimeout(TimeValue.MAX_VALUE);
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
getOrigin(),
request,
groupedActionListener,
(req, listener) -> client.execute(RolloverAction.INSTANCE, req, listener)
);
}
);
for (String rolloverTarget : rolloverTargets) {
logger.info(
"rolling over data stream [{}] lazily as a followup to the upgrade of the [{}] index template [{}]",
rolloverTarget,
getOrigin(),
templateName
);
RolloverRequest request = new RolloverRequest(rolloverTarget, null);
request.lazy(true);
request.masterNodeTimeout(TimeValue.MAX_VALUE);
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
getOrigin(),
request,
groupedActionListener,
(req, listener) -> client.execute(RolloverAction.INSTANCE, req, listener)
);
}
});
}
Expand Down Expand Up @@ -865,7 +871,21 @@ static List<String> findRolloverTargetDataStreams(ClusterState state, String tem
.stream()
// Limit to checking data streams that match any of the index template's index patterns
.filter(ds -> indexTemplate.indexPatterns().stream().anyMatch(pattern -> Regex.simpleMatch(pattern, ds.getName())))
.filter(ds -> templateName.equals(MetadataIndexTemplateService.findV2Template(metadata, ds.getName(), ds.isHidden())))
.filter(ds -> {
final String dsTemplateName = MetadataIndexTemplateService.findV2Template(metadata, ds.getName(), ds.isHidden());
if (templateName.equals(dsTemplateName)) {
return true;
}
// findV2Template did not match templateName, which implies one of two things:
// - indexTemplate has a lower priority than the index template matching for ds, OR
// - indexTemplate does not yet exist in cluster state (i.e. because it's in the process of being
// installed or updated)
//
// Because of the second case, we must check if indexTemplate's priority is greater than the matching
// index template, in case it would take precedence after installation/update.
final ComposableIndexTemplate dsTemplate = metadata.templatesV2().get(dsTemplateName);
return dsTemplate == null || indexTemplate.priorityOrZero() > dsTemplate.priorityOrZero();
})
.map(DataStream::getName)
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private IngestPipelineConfig loadIngestPipeline(String name, int version, @Nulla
}

@Override
protected boolean applyRolloverAfterTemplateV2Upgrade() {
protected boolean applyRolloverAfterTemplateV2Update() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ public void testAutomaticRollover() throws Exception {
assertThat(suppressed[0].getMessage(), startsWith("Failed to rollover logs-my_app-"));
}

public void testNoRolloverForFreshInstalledIndexTemplate() throws Exception {
public void testRolloverForFreshInstalledIndexTemplate() throws Exception {
DiscoveryNode node = DiscoveryNodeUtils.create("node");
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();

Expand Down Expand Up @@ -473,9 +473,10 @@ public void testNoRolloverForFreshInstalledIndexTemplate() throws Exception {
registry.setApplyRollover(true);
registry.clusterChanged(event);
assertBusy(() -> assertThat(putIndexTemplateCounter.get(), equalTo(1)));
// the index component is first installed, not upgraded, therefore rollover should not be triggered
// rollover should be triggered even for the first installation, since the template
// may now take precedence over a data stream's existing index template
Thread.sleep(100L);
assertThat(rolloverCounter.get(), equalTo(0));
assertThat(rolloverCounter.get(), equalTo(2));
}

public void testThatTemplatesAreNotUpgradedWhenNotNeeded() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void setPolicyUpgradeRequired(boolean policyUpgradeRequired) {
}

@Override
protected boolean applyRolloverAfterTemplateV2Upgrade() {
protected boolean applyRolloverAfterTemplateV2Update() {
return applyRollover.get();
}

Expand Down

0 comments on commit 6eef6bf

Please sign in to comment.