Skip to content

Commit

Permalink
Merge branch 'main' into report_long_time_unassigned_jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
droberts195 committed Oct 3, 2023
2 parents 191c753 + 2e86d25 commit 20f0a6d
Show file tree
Hide file tree
Showing 34 changed files with 1,430 additions and 206 deletions.
2 changes: 2 additions & 0 deletions .ci/dockerOnLinuxExclusions
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
# excluded.
debian-8
opensuse-leap-15.1
opensuse-leap-15.5
ol-7.7
sles-12.3 # older version used in Vagrant image
sles-12.5
sles-15.1
sles-15.2
sles-15.3
sles-15.4
sles-15.5

# These OSes are deprecated and filtered starting with 8.0.0, but need to be excluded
# for PR checks
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/100106.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 100106
summary: Validate enrich index before completing policy execution
area: Ingest Node
type: bug
issues: []
5 changes: 5 additions & 0 deletions docs/changelog/100143.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 100143
summary: Preserve order of inference results when calling the _infer API with multiple inputs on a model deployment with more than one allocation the output results order was not guaranteed to match the input order. The fix ensures the output order matches the input order.
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.cluster.routing;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.SimpleDiffable;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -231,6 +232,26 @@ public boolean allPrimaryShardsActive() {
return primaryShardsActive() == shards.length;
}

/**
* @return <code>true</code> if an index is available to service search queries.
*/
public boolean readyForSearch(ClusterState clusterState) {
for (IndexShardRoutingTable shardRoutingTable : this.shards) {
boolean found = false;
for (int idx = 0; idx < shardRoutingTable.size(); idx++) {
ShardRouting shardRouting = shardRoutingTable.shard(idx);
if (shardRouting.active() && OperationRouting.canSearchShard(shardRouting, clusterState)) {
found = true;
break;
}
}
if (found == false) {
return false;
}
}
return true;
}

public boolean allShardsActive() {
return this.allShardsActive;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource.ExistingStoreRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource;
Expand Down Expand Up @@ -930,6 +931,11 @@ public boolean isPromotableToPrimary() {
return role.isPromotableToPrimary();
}

/**
* Determine if role searchable. Consumers should prefer {@link OperationRouting#canSearchShard(ShardRouting, ClusterState)} to
* determine if a shard can be searched and {@link IndexRoutingTable#readyForSearch(ClusterState)} to determine if an index
* is ready to be searched.
*/
public boolean isSearchable() {
return role.isSearchable();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.routing;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.mockito.Mockito;

import java.util.List;

import static org.elasticsearch.index.IndexSettings.INDEX_FAST_REFRESH_SETTING;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class IndexRoutingTableTests extends ESTestCase {

public void testReadyForSearch() {
innerReadyForSearch(false);
innerReadyForSearch(true);
}

private void innerReadyForSearch(boolean fastRefresh) {
Index index = new Index(randomIdentifier(), UUIDs.randomBase64UUID());
ClusterState clusterState = mock(ClusterState.class, Mockito.RETURNS_DEEP_STUBS);
when(clusterState.metadata().index(any(Index.class)).getSettings()).thenReturn(
Settings.builder().put(INDEX_FAST_REFRESH_SETTING.getKey(), fastRefresh).build()
);
// 2 primaries that are search and index
ShardId p1 = new ShardId(index, 0);
IndexShardRoutingTable shardTable1 = new IndexShardRoutingTable(
p1,
List.of(getShard(p1, true, ShardRoutingState.STARTED, ShardRouting.Role.DEFAULT))
);
ShardId p2 = new ShardId(index, 1);
IndexShardRoutingTable shardTable2 = new IndexShardRoutingTable(
p2,
List.of(getShard(p2, true, ShardRoutingState.STARTED, ShardRouting.Role.DEFAULT))
);
IndexRoutingTable indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
assertTrue(indexRoutingTable.readyForSearch(clusterState));

// 2 primaries that are index only
shardTable1 = new IndexShardRoutingTable(p1, List.of(getShard(p1, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
shardTable2 = new IndexShardRoutingTable(p2, List.of(getShard(p2, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY)));
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh) {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
} else {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
}

// 2 unassigned primaries that are index only
shardTable1 = new IndexShardRoutingTable(
p1,
List.of(getShard(p1, true, ShardRoutingState.UNASSIGNED, ShardRouting.Role.INDEX_ONLY))
);
shardTable2 = new IndexShardRoutingTable(
p2,
List.of(getShard(p2, true, ShardRoutingState.UNASSIGNED, ShardRouting.Role.INDEX_ONLY))
);
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
assertFalse(indexRoutingTable.readyForSearch(clusterState));

// 2 primaries that are index only with replicas that are not all available
shardTable1 = new IndexShardRoutingTable(
p1,
List.of(
getShard(p1, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY),
getShard(p1, false, ShardRoutingState.STARTED, ShardRouting.Role.SEARCH_ONLY),
getShard(p1, false, ShardRoutingState.STARTED, ShardRouting.Role.SEARCH_ONLY)
)
);
shardTable2 = new IndexShardRoutingTable(
p2,
List.of(
getShard(p2, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY),
getShard(p2, false, ShardRoutingState.UNASSIGNED, ShardRouting.Role.SEARCH_ONLY),
getShard(p2, false, ShardRoutingState.UNASSIGNED, ShardRouting.Role.SEARCH_ONLY)
)
);
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh) {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
} else {
assertFalse(indexRoutingTable.readyForSearch(clusterState));
}

// 2 primaries that are index only with some replicas that are all available
shardTable1 = new IndexShardRoutingTable(
p1,
List.of(
getShard(p1, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY),
getShard(p1, false, ShardRoutingState.STARTED, ShardRouting.Role.SEARCH_ONLY),
getShard(p1, false, ShardRoutingState.STARTED, ShardRouting.Role.SEARCH_ONLY)
)
);
shardTable2 = new IndexShardRoutingTable(
p2,
List.of(
getShard(p2, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY),
getShard(p2, false, ShardRoutingState.STARTED, ShardRouting.Role.SEARCH_ONLY),
getShard(p2, false, ShardRoutingState.STARTED, ShardRouting.Role.SEARCH_ONLY)
)
);
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
assertTrue(indexRoutingTable.readyForSearch(clusterState));

// 2 unassigned primaries that are index only with some replicas that are all available
// Fast refresh indices do not support replicas so this can not practically happen. If we add support we will want to ensure
// that readyForSearch allows for searching replicas when the index shard is not available.
shardTable1 = new IndexShardRoutingTable(
p1,
List.of(
getShard(p1, true, ShardRoutingState.UNASSIGNED, ShardRouting.Role.INDEX_ONLY),
getShard(p1, false, ShardRoutingState.STARTED, ShardRouting.Role.SEARCH_ONLY),
getShard(p1, false, ShardRoutingState.STARTED, ShardRouting.Role.SEARCH_ONLY)
)
);
shardTable2 = new IndexShardRoutingTable(
p2,
List.of(
getShard(p2, true, ShardRoutingState.UNASSIGNED, ShardRouting.Role.INDEX_ONLY),
getShard(p2, false, ShardRoutingState.STARTED, ShardRouting.Role.SEARCH_ONLY),
getShard(p2, false, ShardRoutingState.STARTED, ShardRouting.Role.SEARCH_ONLY)
)
);
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
if (fastRefresh) {
assertFalse(indexRoutingTable.readyForSearch(clusterState)); // if we support replicas for fast refreshes this needs to change
} else {
assertTrue(indexRoutingTable.readyForSearch(clusterState));
}

// 2 primaries that are index only with at least 1 replica per primary that is available
shardTable1 = new IndexShardRoutingTable(
p1,
List.of(
getShard(p1, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY),
getShard(p1, false, ShardRoutingState.STARTED, ShardRouting.Role.SEARCH_ONLY),
getShard(p1, false, ShardRoutingState.UNASSIGNED, ShardRouting.Role.SEARCH_ONLY)
)
);
shardTable2 = new IndexShardRoutingTable(
p2,
List.of(
getShard(p2, true, ShardRoutingState.STARTED, ShardRouting.Role.INDEX_ONLY),
getShard(p2, false, ShardRoutingState.UNASSIGNED, ShardRouting.Role.SEARCH_ONLY),
getShard(p2, false, ShardRoutingState.STARTED, ShardRouting.Role.SEARCH_ONLY)
)
);
indexRoutingTable = new IndexRoutingTable(index, new IndexShardRoutingTable[] { shardTable1, shardTable2 });
assertTrue(indexRoutingTable.readyForSearch(clusterState));
}

private ShardRouting getShard(ShardId shardId, boolean isPrimary, ShardRoutingState state, ShardRouting.Role role) {
return new ShardRouting(
shardId,
state == ShardRoutingState.UNASSIGNED ? null : randomIdentifier(),
state == ShardRoutingState.UNASSIGNED || state == ShardRoutingState.STARTED ? null : randomIdentifier(),
isPrimary,
state,
TestShardRouting.buildRecoveryTarget(isPrimary, state),
TestShardRouting.buildUnassignedInfo(state),
TestShardRouting.buildRelocationFailureInfo(state),
TestShardRouting.buildAllocationId(state),
randomLongBetween(-1, 1024),
role
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.core.ml.inference.preprocessing.StrictlyParsedPreProcessor;
import org.elasticsearch.xpack.core.ml.inference.preprocessing.TargetMeanEncoding;
import org.elasticsearch.xpack.core.ml.inference.results.ClassificationInferenceResults;
import org.elasticsearch.xpack.core.ml.inference.results.ErrorInferenceResults;
import org.elasticsearch.xpack.core.ml.inference.results.FillMaskResults;
import org.elasticsearch.xpack.core.ml.inference.results.NerResults;
import org.elasticsearch.xpack.core.ml.inference.results.NlpClassificationInferenceResults;
Expand Down Expand Up @@ -639,6 +640,9 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
namedWriteables.add(
new NamedWriteableRegistry.Entry(InferenceResults.class, WarningInferenceResults.NAME, WarningInferenceResults::new)
);
namedWriteables.add(
new NamedWriteableRegistry.Entry(InferenceResults.class, ErrorInferenceResults.NAME, ErrorInferenceResults::new)
);
namedWriteables.add(new NamedWriteableRegistry.Entry(InferenceResults.class, NerResults.NAME, NerResults::new));
namedWriteables.add(new NamedWriteableRegistry.Entry(InferenceResults.class, FillMaskResults.NAME, FillMaskResults::new));
namedWriteables.add(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ml.inference.results;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.inference.InferenceResults;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;

public class ErrorInferenceResults implements InferenceResults {

public static final String NAME = "error";
public static final ParseField WARNING = new ParseField("error");

private final Exception exception;

public ErrorInferenceResults(Exception exception) {
this.exception = Objects.requireNonNull(exception);
}

public ErrorInferenceResults(StreamInput in) throws IOException {
this.exception = in.readException();
}

public Exception getException() {
return exception;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeException(exception);
}

@Override
public boolean equals(Object object) {
if (object == this) {
return true;
}
if (object == null || getClass() != object.getClass()) {
return false;
}
ErrorInferenceResults that = (ErrorInferenceResults) object;
// Just compare the message for serialization test purposes
return Objects.equals(exception.getMessage(), that.exception.getMessage());
}

@Override
public int hashCode() {
// Just compare the message for serialization test purposes
return Objects.hash(exception.getMessage());
}

@Override
public String getResultsField() {
return NAME;
}

@Override
public Map<String, Object> asMap() {
Map<String, Object> asMap = new LinkedHashMap<>();
asMap.put(NAME, exception.getMessage());
return asMap;
}

@Override
public String toString() {
return Strings.toString(this);
}

@Override
public Object predictedValue() {
return null;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(NAME, exception.getMessage());
return builder;
}

@Override
public String getWriteableName() {
return NAME;
}
}
Loading

0 comments on commit 20f0a6d

Please sign in to comment.