Skip to content

Commit

Permalink
Merge branch 'main' into kderusso/semantic-text-match-query-support
Browse files Browse the repository at this point in the history
  • Loading branch information
kderusso authored Dec 11, 2024
2 parents f20a353 + b85e649 commit 3e3cb2e
Show file tree
Hide file tree
Showing 60 changed files with 556 additions and 829 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/118375.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118375
summary: Check for presence of error object when validating streaming responses from integrations in the inference API
area: Machine Learning
type: enhancement
issues: []
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,6 @@ tests:
issue: https://github.com/elastic/elasticsearch/issues/118220
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
issue: https://github.com/elastic/elasticsearch/issues/118238
- class: org.elasticsearch.reservedstate.service.FileSettingsServiceTests
method: testInvalidJSON
issue: https://github.com/elastic/elasticsearch/issues/116521

# Examples:
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.xcontent.XContentParser;
import org.junit.After;
import org.junit.Before;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

import java.io.IOException;
Expand Down Expand Up @@ -315,7 +316,13 @@ public void testInvalidJSON() throws Exception {
writeTestFile(fileSettingsService.watchedFile(), "test_invalid_JSON");
awaitOrBust(fileChangeBarrier);

verify(fileSettingsService, times(1)).onProcessFileChangesException(
// These checks use atLeast(1) because the initial JSON is also invalid,
// and so we sometimes get two calls to these error-reporting methods
// depending on timing. Rather than trace down the root cause and fix
// it, we tolerate this for now because, hey, invalid JSON is invalid JSON
// and this is still testing what we want to test.

verify(fileSettingsService, Mockito.atLeast(1)).onProcessFileChangesException(
argThat(e -> unwrapException(e) instanceof XContentParseException)
);

Expand All @@ -324,7 +331,7 @@ public void testInvalidJSON() throws Exception {
// of the watcher thread itself, which occurs asynchronously when clusterChanged is first called.

assertEquals(YELLOW, healthIndicatorService.calculate(false, null).status());
verify(healthIndicatorService).failureOccurred(contains(XContentParseException.class.getName()));
verify(healthIndicatorService, Mockito.atLeast(1)).failureOccurred(contains(XContentParseException.class.getName()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,6 @@ public boolean isCompletable() {
return true;
}

public static class Result {
private final boolean complete;
private final ToXContentObject informationContext;

public Result(boolean complete, ToXContentObject informationContext) {
this.complete = complete;
this.informationContext = informationContext;
}

public boolean isComplete() {
return complete;
}

public ToXContentObject getInformationContext() {
return informationContext;
}
}
public record Result(boolean complete, ToXContentObject informationContext) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public Result isConditionMet(Index index, ClusterState clusterState) {

Result stepResult = stepToExecute.isConditionMet(index, clusterState);

if (stepResult.isComplete() == false) {
if (stepResult.complete() == false) {
// checking the threshold after we execute the step to make sure we execute the wrapped step at least once (because time is a
// wonderful thing)
TimeValue retryThreshold = LifecycleSettings.LIFECYCLE_STEP_WAIT_TIME_THRESHOLD_SETTING.get(idxMeta.getSettings());
Expand All @@ -77,7 +77,7 @@ public Result isConditionMet(Index index, ClusterState clusterState) {
getKey().name(),
getKey().action(),
idxMeta.getIndex().getName(),
Strings.toString(stepResult.getInformationContext()),
Strings.toString(stepResult.informationContext()),
nextKeyOnThresholdBreach
);
logger.debug(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,29 +114,20 @@ public boolean equals(Object obj) {
return super.equals(obj) && Objects.equals(maxNumSegments, other.maxNumSegments);
}

public static class Info implements ToXContentObject {

private final long numberShardsLeftToMerge;
public record Info(long numberShardsLeftToMerge) implements ToXContentObject {

static final ParseField SHARDS_TO_MERGE = new ParseField("shards_left_to_merge");
static final ParseField MESSAGE = new ParseField("message");
static final ConstructingObjectParser<Info, Void> PARSER = new ConstructingObjectParser<>(
"segment_count_step_info",
a -> new Info((long) a[0])
);

static {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), SHARDS_TO_MERGE);
PARSER.declareString((i, s) -> {}, MESSAGE);
}

public Info(long numberShardsLeftToMerge) {
this.numberShardsLeftToMerge = numberShardsLeftToMerge;
}

public long getNumberShardsLeftToMerge() {
return numberShardsLeftToMerge;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand All @@ -150,23 +141,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

@Override
public int hashCode() {
return Objects.hash(numberShardsLeftToMerge);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Info other = (Info) obj;
return Objects.equals(numberShardsLeftToMerge, other.numberShardsLeftToMerge);
}

@Override
public String toString() {
return Strings.toString(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ilm.UnfollowAction.CCR_METADATA_KEY;
Expand Down Expand Up @@ -70,9 +69,9 @@ static void handleResponse(FollowStatsAction.StatsResponses responses, Listener
if (conditionMet) {
listener.onResponse(true, null);
} else {
List<Info.ShardFollowTaskInfo> shardFollowTaskInfos = unSyncedShardFollowStatuses.stream()
List<ShardFollowTaskInfo> shardFollowTaskInfos = unSyncedShardFollowStatuses.stream()
.map(
status -> new Info.ShardFollowTaskInfo(
status -> new ShardFollowTaskInfo(
status.followerIndex(),
status.getShardId(),
status.leaderGlobalCheckpoint(),
Expand All @@ -84,21 +83,11 @@ static void handleResponse(FollowStatsAction.StatsResponses responses, Listener
}
}

static final class Info implements ToXContentObject {
record Info(List<ShardFollowTaskInfo> shardFollowTaskInfos) implements ToXContentObject {

static final ParseField SHARD_FOLLOW_TASKS = new ParseField("shard_follow_tasks");
static final ParseField MESSAGE = new ParseField("message");

private final List<ShardFollowTaskInfo> shardFollowTaskInfos;

Info(List<ShardFollowTaskInfo> shardFollowTaskInfos) {
this.shardFollowTaskInfos = shardFollowTaskInfos;
}

List<ShardFollowTaskInfo> getShardFollowTaskInfos() {
return shardFollowTaskInfos;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand All @@ -114,85 +103,30 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Info info = (Info) o;
return Objects.equals(shardFollowTaskInfos, info.shardFollowTaskInfos);
}

@Override
public int hashCode() {
return Objects.hash(shardFollowTaskInfos);
}

@Override
public String toString() {
return Strings.toString(this);
}
}

static final class ShardFollowTaskInfo implements ToXContentObject {

static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
static final ParseField SHARD_ID_FIELD = new ParseField("shard_id");
static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint");
static final ParseField FOLLOWER_GLOBAL_CHECKPOINT_FIELD = new ParseField("follower_global_checkpoint");

private final String followerIndex;
private final int shardId;
private final long leaderGlobalCheckpoint;
private final long followerGlobalCheckpoint;

ShardFollowTaskInfo(String followerIndex, int shardId, long leaderGlobalCheckpoint, long followerGlobalCheckpoint) {
this.followerIndex = followerIndex;
this.shardId = shardId;
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
this.followerGlobalCheckpoint = followerGlobalCheckpoint;
}

String getFollowerIndex() {
return followerIndex;
}

int getShardId() {
return shardId;
}

long getLeaderGlobalCheckpoint() {
return leaderGlobalCheckpoint;
}

long getFollowerGlobalCheckpoint() {
return followerGlobalCheckpoint;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
builder.field(SHARD_ID_FIELD.getPreferredName(), shardId);
builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint);
builder.field(FOLLOWER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), followerGlobalCheckpoint);
builder.endObject();
return builder;
}
record ShardFollowTaskInfo(String followerIndex, int shardId, long leaderGlobalCheckpoint, long followerGlobalCheckpoint)
implements
ToXContentObject {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardFollowTaskInfo that = (ShardFollowTaskInfo) o;
return shardId == that.shardId
&& leaderGlobalCheckpoint == that.leaderGlobalCheckpoint
&& followerGlobalCheckpoint == that.followerGlobalCheckpoint
&& Objects.equals(followerIndex, that.followerIndex);
}
static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
static final ParseField SHARD_ID_FIELD = new ParseField("shard_id");
static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint");
static final ParseField FOLLOWER_GLOBAL_CHECKPOINT_FIELD = new ParseField("follower_global_checkpoint");

@Override
public int hashCode() {
return Objects.hash(followerIndex, shardId, leaderGlobalCheckpoint, followerGlobalCheckpoint);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
builder.field(SHARD_ID_FIELD.getPreferredName(), shardId);
builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint);
builder.field(FOLLOWER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), followerGlobalCheckpoint);
builder.endObject();
return builder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,15 @@
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Represents the state of an index's shards allocation, including a user friendly message describing the current state.
* It allows to transfer the allocation information to {@link org.elasticsearch.xcontent.XContent} using
* {@link #toXContent(XContentBuilder, Params)}
*/
public class AllocationInfo implements ToXContentObject {

private final long numberOfReplicas;
private final long numberShardsLeftToAllocate;
private final boolean allShardsActive;
private final String message;
public record AllocationInfo(long numberOfReplicas, long numberShardsLeftToAllocate, boolean allShardsActive, String message)
implements
ToXContentObject {

static final ParseField NUMBER_OF_REPLICAS = new ParseField("number_of_replicas");
static final ParseField SHARDS_TO_ALLOCATE = new ParseField("shards_left_to_allocate");
Expand All @@ -44,13 +40,6 @@ public class AllocationInfo implements ToXContentObject {
PARSER.declareString(ConstructingObjectParser.constructorArg(), MESSAGE);
}

public AllocationInfo(long numberOfReplicas, long numberShardsLeftToAllocate, boolean allShardsActive, String message) {
this.numberOfReplicas = numberOfReplicas;
this.numberShardsLeftToAllocate = numberShardsLeftToAllocate;
this.allShardsActive = allShardsActive;
this.message = message;
}

/**
* Builds the AllocationInfo representing a cluster state with a routing table that does not have enough shards active for a
* particular index.
Expand All @@ -72,22 +61,6 @@ public static AllocationInfo allShardsActiveAllocationInfo(long numReplicas, lon
);
}

public long getNumberOfReplicas() {
return numberOfReplicas;
}

public long getNumberShardsLeftToAllocate() {
return numberShardsLeftToAllocate;
}

public boolean allShardsActive() {
return allShardsActive;
}

public String getMessage() {
return message;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand All @@ -99,26 +72,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

@Override
public int hashCode() {
return Objects.hash(numberOfReplicas, numberShardsLeftToAllocate, allShardsActive);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
AllocationInfo other = (AllocationInfo) obj;
return Objects.equals(numberOfReplicas, other.numberOfReplicas)
&& Objects.equals(numberShardsLeftToAllocate, other.numberShardsLeftToAllocate)
&& Objects.equals(message, other.message)
&& Objects.equals(allShardsActive, other.allShardsActive);
}

@Override
public String toString() {
return Strings.toString(this);
Expand Down
Loading

0 comments on commit 3e3cb2e

Please sign in to comment.