Skip to content

Commit

Permalink
[Failure store - selector syntax] Introduce the ::* selector (elas…
Browse files Browse the repository at this point in the history
…tic#115389)

**Introduction**

> In order to make adoption of failure stores simpler for all users, we
are introducing a new syntactical feature to index expression
resolution: The selector. > > Selectors, denoted with a :: followed by a
recognized suffix will allow users to specify which component of an
index abstraction they would like to operate on within an API call. In
this case, an index abstraction is a concrete index, data stream, or
alias; Any abstraction that can be resolved to a set of indices/shards.
We define a component of an index abstraction to be some searchable unit
of the index abstraction. > > To start, we will support two components:
data and failures. Concrete indices are their own data components, while
the data component for index aliases are all of the indices contained
therein. For data streams, the data component corresponds to their
backing indices. Data stream aliases mirror this, treating all backing
indices of the data streams they correspond to as their data component.
>  > The failure component is only supported by data streams and data
stream aliases. The failure component of these abstractions refer to the
data streams' failure stores. Indices and index aliases do not have a
failure component.

For more details and examples see
elastic#113144. All this work has
been cherry picked from there.

**Purpose of this PR**

This PR is introducing the `::*` as another selector option and not as a
combination of `::data` and `::failure`. The reason for this change is
that we need to differentiate between:

- `my-index::*` which should resolve to `my-index::data` only and not to `my-index::failures` and
- a user explicitly requesting `my-index::data, my-index::failures` which should result potentially to an error.
  • Loading branch information
gmarouli authored and davidkyle committed Oct 24, 2024
1 parent a6cb9b3 commit 22d1690
Show file tree
Hide file tree
Showing 21 changed files with 197 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void setup() throws Exception {
// Initialize the failure store.
RolloverRequest rolloverRequest = new RolloverRequest("with-fs", null);
rolloverRequest.setIndicesOptions(
IndicesOptions.builder(rolloverRequest.indicesOptions()).selectorOptions(IndicesOptions.SelectorOptions.ONLY_FAILURES).build()
IndicesOptions.builder(rolloverRequest.indicesOptions()).selectorOptions(IndicesOptions.SelectorOptions.FAILURES).build()
);
response = client.execute(RolloverAction.INSTANCE, rolloverRequest).get();
assertTrue(response.isAcknowledged());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public void testRejectionFromFailureStore() throws IOException {
// Initialize failure store.
var rolloverRequest = new RolloverRequest(dataStream, null);
rolloverRequest.setIndicesOptions(
IndicesOptions.builder(rolloverRequest.indicesOptions()).selectorOptions(IndicesOptions.SelectorOptions.ONLY_FAILURES).build()
IndicesOptions.builder(rolloverRequest.indicesOptions()).selectorOptions(IndicesOptions.SelectorOptions.FAILURES).build()
);
var rolloverResponse = client().execute(RolloverAction.INSTANCE, rolloverRequest).actionGet();
var failureStoreIndex = rolloverResponse.getNewIndex();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ private Set<Index> maybeExecuteForceMerge(ClusterState state, List<Index> indice
UpdateSettingsRequest updateMergePolicySettingsRequest = new UpdateSettingsRequest();
updateMergePolicySettingsRequest.indicesOptions(
IndicesOptions.builder(updateMergePolicySettingsRequest.indicesOptions())
.selectorOptions(IndicesOptions.SelectorOptions.DATA_AND_FAILURE)
.selectorOptions(IndicesOptions.SelectorOptions.ALL_APPLICABLE)
.build()
);
updateMergePolicySettingsRequest.indices(indexName);
Expand Down Expand Up @@ -1408,9 +1408,7 @@ static RolloverRequest getDefaultRolloverRequest(
RolloverRequest rolloverRequest = new RolloverRequest(dataStream, null).masterNodeTimeout(TimeValue.MAX_VALUE);
if (rolloverFailureStore) {
rolloverRequest.setIndicesOptions(
IndicesOptions.builder(rolloverRequest.indicesOptions())
.selectorOptions(IndicesOptions.SelectorOptions.ONLY_FAILURES)
.build()
IndicesOptions.builder(rolloverRequest.indicesOptions()).selectorOptions(IndicesOptions.SelectorOptions.FAILURES).build()
);
}
rolloverRequest.setConditions(rolloverConfiguration.resolveRolloverConditions(dataRetention));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ public void testOperationsExecutedOnce() {
assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
RolloverRequest rolloverBackingIndexRequest = (RolloverRequest) clientSeenRequests.get(0);
assertThat(rolloverBackingIndexRequest.getRolloverTarget(), is(dataStreamName));
assertThat(rolloverBackingIndexRequest.indicesOptions().selectorOptions(), equalTo(IndicesOptions.SelectorOptions.ONLY_DATA));
assertThat(rolloverBackingIndexRequest.indicesOptions().selectorOptions(), equalTo(IndicesOptions.SelectorOptions.DATA));
assertThat(clientSeenRequests.get(1), instanceOf(RolloverRequest.class));
RolloverRequest rolloverFailureIndexRequest = (RolloverRequest) clientSeenRequests.get(1);
assertThat(rolloverFailureIndexRequest.getRolloverTarget(), is(dataStreamName));
assertThat(rolloverFailureIndexRequest.indicesOptions().selectorOptions(), equalTo(IndicesOptions.SelectorOptions.ONLY_FAILURES));
assertThat(rolloverFailureIndexRequest.indicesOptions().selectorOptions(), equalTo(IndicesOptions.SelectorOptions.FAILURES));
List<DeleteIndexRequest> deleteRequests = clientSeenRequests.subList(2, 5)
.stream()
.map(transportRequest -> (DeleteIndexRequest) transportRequest)
Expand Down Expand Up @@ -1546,11 +1546,11 @@ public void testFailureStoreIsManagedEvenWhenDisabled() {
assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
RolloverRequest rolloverBackingIndexRequest = (RolloverRequest) clientSeenRequests.get(0);
assertThat(rolloverBackingIndexRequest.getRolloverTarget(), is(dataStreamName));
assertThat(rolloverBackingIndexRequest.indicesOptions().selectorOptions(), equalTo(IndicesOptions.SelectorOptions.ONLY_DATA));
assertThat(rolloverBackingIndexRequest.indicesOptions().selectorOptions(), equalTo(IndicesOptions.SelectorOptions.DATA));
assertThat(clientSeenRequests.get(1), instanceOf(RolloverRequest.class));
RolloverRequest rolloverFailureIndexRequest = (RolloverRequest) clientSeenRequests.get(1);
assertThat(rolloverFailureIndexRequest.getRolloverTarget(), is(dataStreamName));
assertThat(rolloverFailureIndexRequest.indicesOptions().selectorOptions(), equalTo(IndicesOptions.SelectorOptions.ONLY_FAILURES));
assertThat(rolloverFailureIndexRequest.indicesOptions().selectorOptions(), equalTo(IndicesOptions.SelectorOptions.FAILURES));
assertThat(
((DeleteIndexRequest) clientSeenRequests.get(2)).indices()[0],
is(dataStream.getFailureIndices().getIndices().get(0).getName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_FIELD_ATTRIBUTE_PARENT_SIMPLIFIED = def(8_775_00_0);
public static final TransportVersion INFERENCE_DONT_PERSIST_ON_READ = def(8_776_00_0);
public static final TransportVersion SIMULATE_MAPPING_ADDITION = def(8_777_00_0);
public static final TransportVersion INTRODUCE_ALL_APPLICABLE_SELECTOR = def(8_778_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public GetIndexRequest() {
super(
DataStream.isFailureStoreFeatureFlagEnabled()
? IndicesOptions.builder(IndicesOptions.strictExpandOpen())
.selectorOptions(IndicesOptions.SelectorOptions.DATA_AND_FAILURE)
.selectorOptions(IndicesOptions.SelectorOptions.ALL_APPLICABLE)
.build()
: IndicesOptions.strictExpandOpen()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndexComponentSelector;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.metadata.DataStream;
Expand Down Expand Up @@ -124,8 +125,8 @@ public ActionRequestValidationException validate() {
);
}

var selectors = indicesOptions.selectorOptions().defaultSelectors();
if (selectors.size() > 1) {
var selector = indicesOptions.selectorOptions().defaultSelector();
if (selector == IndexComponentSelector.ALL_APPLICABLE) {
validationException = addValidationError(
"rollover cannot be applied to both regular and failure indices at the same time",
validationException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private void rollOverFailureStores(Runnable runnable) {
RolloverRequest rolloverRequest = new RolloverRequest(dataStream, null);
rolloverRequest.setIndicesOptions(
IndicesOptions.builder(rolloverRequest.indicesOptions())
.selectorOptions(IndicesOptions.SelectorOptions.ONLY_FAILURES)
.selectorOptions(IndicesOptions.SelectorOptions.FAILURES)
.build()
);
// We are executing a lazy rollover because it is an action specialised for this situation, when we want an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ private void rollOverDataStreams(
if (targetFailureStore) {
rolloverRequest.setIndicesOptions(
IndicesOptions.builder(rolloverRequest.indicesOptions())
.selectorOptions(IndicesOptions.SelectorOptions.ONLY_FAILURES)
.selectorOptions(IndicesOptions.SelectorOptions.FAILURES)
.build()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Request() {
.allowFailureIndices(true)
.build()
)
.selectorOptions(IndicesOptions.SelectorOptions.DATA_AND_FAILURE)
.selectorOptions(IndicesOptions.SelectorOptions.ALL_APPLICABLE)
.build()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@

package org.elasticsearch.action.support;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -17,33 +23,82 @@
* We define as index components the two different sets of indices a data stream could consist of:
* - DATA: represents the backing indices
* - FAILURES: represent the failing indices
* - ALL: represents all available in this expression components, meaning if it's a data stream both backing and failure indices and if it's
* an index only the index itself.
* Note: An index is its own DATA component, but it cannot have a FAILURE component.
*/
public enum IndexComponentSelector {
DATA("data"),
FAILURES("failures");
public enum IndexComponentSelector implements Writeable {
DATA("data", (byte) 0),
FAILURES("failures", (byte) 1),
ALL_APPLICABLE("*", (byte) 2);

private final String key;
private final byte id;

IndexComponentSelector(String key) {
IndexComponentSelector(String key, byte id) {
this.key = key;
this.id = id;
}

public String getKey() {
return key;
}

private static final Map<String, IndexComponentSelector> REGISTRY;
public byte getId() {
return id;
}

private static final Map<String, IndexComponentSelector> KEY_REGISTRY;
private static final Map<Byte, IndexComponentSelector> ID_REGISTRY;

static {
Map<String, IndexComponentSelector> registry = new HashMap<>(IndexComponentSelector.values().length);
Map<String, IndexComponentSelector> keyRegistry = new HashMap<>(IndexComponentSelector.values().length);
for (IndexComponentSelector value : IndexComponentSelector.values()) {
keyRegistry.put(value.getKey(), value);
}
KEY_REGISTRY = Collections.unmodifiableMap(keyRegistry);
Map<Byte, IndexComponentSelector> idRegistry = new HashMap<>(IndexComponentSelector.values().length);
for (IndexComponentSelector value : IndexComponentSelector.values()) {
registry.put(value.getKey(), value);
idRegistry.put(value.getId(), value);
}
REGISTRY = Collections.unmodifiableMap(registry);
ID_REGISTRY = Collections.unmodifiableMap(idRegistry);
}

/**
* Retrieves the respective selector when the suffix key is recognised
* @param key the suffix key, probably parsed from an expression
* @return the selector or null if the key was not recognised.
*/
@Nullable
public static IndexComponentSelector getByKey(String key) {
return REGISTRY.get(key);
return KEY_REGISTRY.get(key);
}

public static IndexComponentSelector read(StreamInput in) throws IOException {
return getById(in.readByte());
}

// Visible for testing
static IndexComponentSelector getById(byte id) {
IndexComponentSelector indexComponentSelector = ID_REGISTRY.get(id);
if (indexComponentSelector == null) {
throw new IllegalArgumentException(
"Unknown id of index component selector [" + id + "], available options are: " + ID_REGISTRY
);
}
return indexComponentSelector;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeByte(id);
}

public boolean shouldIncludeData() {
return this == ALL_APPLICABLE || this == DATA;
}

public boolean shouldIncludeFailures() {
return this == ALL_APPLICABLE || this == FAILURES;
}
}
Loading

0 comments on commit 22d1690

Please sign in to comment.