Skip to content

Commit

Permalink
Excluding system data streams from global and factory retention (elas…
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke authored May 3, 2024
1 parent 5f4ef87 commit a561958
Show file tree
Hide file tree
Showing 19 changed files with 650 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry;
Expand All @@ -41,12 +42,16 @@
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamGlobalRetentionAction;
import org.elasticsearch.datastreams.lifecycle.action.PutDataStreamGlobalRetentionAction;
import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthIndicatorService;
import org.elasticsearch.health.Diagnosis;
import org.elasticsearch.health.GetHealthAction;
Expand All @@ -58,21 +63,28 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.indices.ExecutorNames;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.junit.After;

import java.io.IOException;
import java.time.Clock;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
Expand All @@ -82,6 +94,8 @@
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.ONE_HUNDRED_MB;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.TARGET_MERGE_FACTOR_VALUE;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT.TestSystemDataStreamPlugin.SYSTEM_DATA_STREAM_NAME;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT.TestSystemDataStreamPlugin.SYSTEM_DATA_STREAM_RETENTION_DAYS;
import static org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthIndicatorService.STAGNATING_BACKING_INDICES_DIAGNOSIS_DEF;
import static org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthIndicatorService.STAGNATING_INDEX_IMPACT;
import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
Expand All @@ -102,7 +116,7 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataStreamsPlugin.class, MockTransportService.TestPlugin.class);
return List.of(DataStreamsPlugin.class, MockTransportService.TestPlugin.class, TestSystemDataStreamPlugin.class);
}

@Override
Expand Down Expand Up @@ -173,6 +187,116 @@ public void testRolloverAndRetention() throws Exception {
});
}

@SuppressWarnings("unchecked")
public void testSystemDataStreamRetention() throws Exception {
/*
* This test makes sure that global data stream retention is ignored by system data streams, and that the configured retention
* for a system data stream is respected instead.
*/
Iterable<DataStreamLifecycleService> dataStreamLifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class);
Clock clock = Clock.systemUTC();
AtomicLong now = new AtomicLong(clock.millis());
dataStreamLifecycleServices.forEach(dataStreamLifecycleService -> dataStreamLifecycleService.setNowSupplier(now::get));
try {
// Putting in place a global retention that we expect will be ignored by the system data stream:
final int globalRetentionSeconds = 10;
client().execute(
PutDataStreamGlobalRetentionAction.INSTANCE,
new PutDataStreamGlobalRetentionAction.Request(
TimeValue.timeValueSeconds(globalRetentionSeconds),
TimeValue.timeValueSeconds(globalRetentionSeconds)
)
).actionGet();
try {

CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(SYSTEM_DATA_STREAM_NAME);
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet();
indexDocs(SYSTEM_DATA_STREAM_NAME, 1);
/*
* First we advance the time to well beyond the global retention (10s) but well under the configured retention (100d).
* We expect to see that rollover has occurred but that the old index has not been deleted since the global retention is
* ignored.
*/
now.addAndGet(TimeValue.timeValueSeconds(3 * globalRetentionSeconds).millis());
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
new String[] { SYSTEM_DATA_STREAM_NAME }
);
GetDataStreamAction.Response getDataStreamResponse = client().execute(
GetDataStreamAction.INSTANCE,
getDataStreamRequest
).actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(SYSTEM_DATA_STREAM_NAME));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(2)); // global retention is ignored
// we expect the data stream to have two backing indices since the effective retention is 100 days
String writeIndex = backingIndices.get(1).getName();
assertThat(writeIndex, backingIndexEqualTo(SYSTEM_DATA_STREAM_NAME, 2));
});

// Now we advance the time to well beyond the configured retention. We expect that the older index will have been deleted.
now.addAndGet(TimeValue.timeValueDays(3 * TestSystemDataStreamPlugin.SYSTEM_DATA_STREAM_RETENTION_DAYS).millis());
assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(
new String[] { SYSTEM_DATA_STREAM_NAME }
);
GetDataStreamAction.Response getDataStreamResponse = client().execute(
GetDataStreamAction.INSTANCE,
getDataStreamRequest
).actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(SYSTEM_DATA_STREAM_NAME));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(1)); // global retention is ignored
// we expect the data stream to have only one backing index, the write one, with generation 2
// as generation 1 would've been deleted by the data stream lifecycle given the configuration
String writeIndex = backingIndices.get(0).getName();
assertThat(writeIndex, backingIndexEqualTo(SYSTEM_DATA_STREAM_NAME, 2));
try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) {
builder.humanReadable(true);
ToXContent.Params withEffectiveRetention = new ToXContent.MapParams(
DataStreamLifecycle.INCLUDE_EFFECTIVE_RETENTION_PARAMS
);
getDataStreamResponse.getDataStreams()
.get(0)
.toXContent(
builder,
withEffectiveRetention,
getDataStreamResponse.getRolloverConfiguration(),
getDataStreamResponse.getGlobalRetention()
);
String serialized = Strings.toString(builder);
Map<String, Object> resultMap = XContentHelper.convertToMap(
XContentType.JSON.xContent(),
serialized,
randomBoolean()
);
assertNotNull(resultMap);
Map<String, Object> lifecycleMap = (Map<String, Object>) resultMap.get("lifecycle");
assertNotNull(lifecycleMap);
assertThat(
lifecycleMap.get("data_retention"),
equalTo(TimeValue.timeValueDays(SYSTEM_DATA_STREAM_RETENTION_DAYS).getStringRep())
);
assertThat(
lifecycleMap.get("effective_retention"),
equalTo(TimeValue.timeValueDays(SYSTEM_DATA_STREAM_RETENTION_DAYS).getStringRep())
);
assertThat(lifecycleMap.get("retention_determined_by"), equalTo("data_stream_configuration"));
assertThat(lifecycleMap.get("enabled"), equalTo(true));
}
});

client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(SYSTEM_DATA_STREAM_NAME)).actionGet();
} finally {
client().execute(DeleteDataStreamGlobalRetentionAction.INSTANCE, new DeleteDataStreamGlobalRetentionAction.Request());
}
} finally {
dataStreamLifecycleServices.forEach(dataStreamLifecycleService -> dataStreamLifecycleService.setNowSupplier(clock::millis));
}
}

public void testOriginationDate() throws Exception {
/*
* In this test, we set up a datastream with 7 day retention. Then we add two indices to it -- one with an origination date 365
Expand Down Expand Up @@ -880,4 +1004,51 @@ static void updateLifecycle(String dataStreamName, TimeValue dataRetention) {
);
assertAcked(client().execute(PutDataStreamLifecycleAction.INSTANCE, putDataLifecycleRequest));
}

/*
* This test plugin adds `.system-test` as a known system data stream. The data stream is not created by this plugin. But if it is
* created, it will be a system data stream.
*/
public static class TestSystemDataStreamPlugin extends Plugin implements SystemIndexPlugin {
public static final String SYSTEM_DATA_STREAM_NAME = ".system-test";
public static final int SYSTEM_DATA_STREAM_RETENTION_DAYS = 100;

@Override
public String getFeatureName() {
return "test";
}

@Override
public String getFeatureDescription() {
return "test";
}

@Override
public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
return List.of(
new SystemDataStreamDescriptor(
SYSTEM_DATA_STREAM_NAME,
"test",
SystemDataStreamDescriptor.Type.INTERNAL,
ComposableIndexTemplate.builder()
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.indexPatterns(List.of(DataStream.BACKING_INDEX_PREFIX + SYSTEM_DATA_STREAM_NAME + "*"))
.template(
new Template(
Settings.EMPTY,
null,
null,
DataStreamLifecycle.newBuilder()
.dataRetention(TimeValue.timeValueDays(SYSTEM_DATA_STREAM_RETENTION_DAYS))
.build()
)
)
.build(),
Map.of(),
List.of(),
ExecutorNames.DEFAULT_SYSTEM_INDEX_THREAD_POOLS
)
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamGlobalRetentionAction;
import org.elasticsearch.datastreams.lifecycle.action.PutDataStreamGlobalRetentionAction;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -46,6 +49,8 @@

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT.TestSystemDataStreamPlugin.SYSTEM_DATA_STREAM_NAME;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT.TestSystemDataStreamPlugin.SYSTEM_DATA_STREAM_RETENTION_DAYS;
import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -59,7 +64,11 @@ public class ExplainDataStreamLifecycleIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataStreamsPlugin.class, MockTransportService.TestPlugin.class);
return List.of(
DataStreamsPlugin.class,
MockTransportService.TestPlugin.class,
DataStreamLifecycleServiceIT.TestSystemDataStreamPlugin.class
);
}

@Override
Expand Down Expand Up @@ -194,6 +203,67 @@ public void testExplainLifecycle() throws Exception {
}
}

public void testSystemExplainLifecycle() throws Exception {
/*
* This test makes sure that for system data streams, we correctly ignore the global retention when calling
* ExplainDataStreamLifecycle. It is very similar to testExplainLifecycle, but only focuses on the retention for a system index.
*/
// Putting in place a global retention that we expect will be ignored by the system data stream:
final int globalRetentionSeconds = 10;
client().execute(
PutDataStreamGlobalRetentionAction.INSTANCE,
new PutDataStreamGlobalRetentionAction.Request(
TimeValue.timeValueSeconds(globalRetentionSeconds),
TimeValue.timeValueSeconds(globalRetentionSeconds)
)
).actionGet();
try {
String dataStreamName = SYSTEM_DATA_STREAM_NAME;
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();

indexDocs(dataStreamName, 1);

assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(2));
String backingIndex = backingIndices.get(0).getName();
assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
String writeIndex = backingIndices.get(1).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});

ExplainDataStreamLifecycleAction.Request explainIndicesRequest = new ExplainDataStreamLifecycleAction.Request(
new String[] {
DataStream.getDefaultBackingIndexName(dataStreamName, 1),
DataStream.getDefaultBackingIndexName(dataStreamName, 2) }
);
ExplainDataStreamLifecycleAction.Response response = client().execute(
ExplainDataStreamLifecycleAction.INSTANCE,
explainIndicesRequest
).actionGet();
assertThat(response.getIndices().size(), is(2));
// we requested the explain for indices with the default include_details=false
assertThat(response.getRolloverConfiguration(), nullValue());
for (ExplainIndexDataStreamLifecycle explainIndex : response.getIndices()) {
assertThat(explainIndex.isManagedByLifecycle(), is(true));
assertThat(explainIndex.getIndexCreationDate(), notNullValue());
assertThat(explainIndex.getLifecycle(), notNullValue());
assertThat(
explainIndex.getLifecycle().getDataStreamRetention(),
equalTo(TimeValue.timeValueDays(SYSTEM_DATA_STREAM_RETENTION_DAYS))
);
}
} finally {
client().execute(DeleteDataStreamGlobalRetentionAction.INSTANCE, new DeleteDataStreamGlobalRetentionAction.Request());
}
}

public void testExplainLifecycleForIndicesWithErrors() throws Exception {
// empty lifecycle contains the default rollover
DataStreamLifecycle lifecycle = new DataStreamLifecycle();
Expand Down
Loading

0 comments on commit a561958

Please sign in to comment.