Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into schedule-reroute
Browse files Browse the repository at this point in the history
  • Loading branch information
imRishN committed Sep 3, 2024
2 parents 3ba4633 + 4516065 commit 10b083a
Show file tree
Hide file tree
Showing 149 changed files with 6,322 additions and 400 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- Support cancellation for cat shards and node stats API.([#13966](https://github.com/opensearch-project/OpenSearch/pull/13966))
- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381))
- Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))
Expand All @@ -43,6 +44,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
- Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))
- Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557))
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,15 @@ public void testSnapshotsStatus() {
Map<String, String> expectedParams = new HashMap<>();
String repository = RequestConvertersTests.randomIndicesNames(1, 1)[0];
String[] snapshots = RequestConvertersTests.randomIndicesNames(1, 5);
String[] indices = RequestConvertersTests.randomIndicesNames(1, 5);
StringBuilder snapshotNames = new StringBuilder(snapshots[0]);
for (int idx = 1; idx < snapshots.length; idx++) {
snapshotNames.append(",").append(snapshots[idx]);
}
boolean ignoreUnavailable = randomBoolean();
String endpoint = "/_snapshot/" + repository + "/" + snapshotNames.toString() + "/_status";

SnapshotsStatusRequest snapshotsStatusRequest = new SnapshotsStatusRequest(repository, snapshots);
SnapshotsStatusRequest snapshotsStatusRequest = new SnapshotsStatusRequest(repository, snapshots, indices);
RequestConvertersTests.setRandomClusterManagerTimeout(snapshotsStatusRequest, expectedParams);
snapshotsStatusRequest.ignoreUnavailable(ignoreUnavailable);
expectedParams.put("ignore_unavailable", Boolean.toString(ignoreUnavailable));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,18 @@
package org.opensearch.core.common.io.stream;

import org.apache.lucene.store.BufferedChecksum;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
import java.util.zip.Checksum;

Expand Down Expand Up @@ -90,4 +99,75 @@ public void reset() throws IOException {
public void resetDigest() {
digest.reset();
}

@Override
public void writeMap(@Nullable Map<String, Object> map) throws IOException {
Map<String, Object> newMap = new TreeMap<>(map);
writeGenericValue(newMap);
}

@Override
public <K, V> void writeMap(Map<K, V> map, final Writeable.Writer<K> keyWriter, final Writeable.Writer<V> valueWriter)
throws IOException {
writeVInt(map.size());
map.keySet().stream().sorted().forEachOrdered(key -> {
try {
keyWriter.write(this, key);
valueWriter.write(this, map.get(key));
} catch (IOException e) {
throw new RuntimeException("Failed to write map values.", e);
}
});
}

public <K, V> void writeMapValues(Map<K, V> map, final Writeable.Writer<V> valueWriter) throws IOException {
writeVInt(map.size());
map.keySet().stream().sorted().forEachOrdered(key -> {
try {
valueWriter.write(this, map.get(key));
} catch (IOException e) {
throw new RuntimeException("Failed to write map values.", e);
}
});
}

@Override
public void writeStringArray(String[] array) throws IOException {
String[] copyArray = Arrays.copyOf(array, array.length);
Arrays.sort(copyArray);
super.writeStringArray(copyArray);
}

@Override
public void writeVLongArray(long[] values) throws IOException {
long[] copyValues = Arrays.copyOf(values, values.length);
Arrays.sort(copyValues);
super.writeVLongArray(copyValues);
}

@Override
public void writeCollection(final Collection<? extends Writeable> collection) throws IOException {
List<? extends Writeable> sortedList = collection.stream().sorted().collect(Collectors.toList());
super.writeCollection(sortedList, (o, v) -> v.writeTo(o));
}

@Override
public void writeStringCollection(final Collection<String> collection) throws IOException {
List<String> listCollection = new ArrayList<>(collection);
Collections.sort(listCollection);
writeCollection(listCollection, StreamOutput::writeString);
}

@Override
public void writeOptionalStringCollection(final Collection<String> collection) throws IOException {
if (collection != null) {
List<String> listCollection = new ArrayList<>(collection);
Collections.sort(listCollection);
writeBoolean(true);
writeCollection(listCollection, StreamOutput::writeString);
} else {
writeBoolean(false);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ public final <K, V> void writeMapOfLists(final Map<K, List<V>> map, final Writer
* @param keyWriter The key writer
* @param valueWriter The value writer
*/
public final <K, V> void writeMap(final Map<K, V> map, final Writer<K> keyWriter, final Writer<V> valueWriter) throws IOException {
public <K, V> void writeMap(final Map<K, V> map, final Writer<K> keyWriter, final Writer<V> valueWriter) throws IOException {
writeVInt(map.size());
for (final Map.Entry<K, V> entry : map.entrySet()) {
keyWriter.write(this, entry.getKey());
Expand Down Expand Up @@ -969,9 +969,13 @@ public <T extends Writeable> void writeOptionalArray(@Nullable T[] array) throws
}

public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOException {
writeOptionalWriteable((out, writable) -> writable.writeTo(out), writeable);
}

public <T extends Writeable> void writeOptionalWriteable(final Writer<T> writer, @Nullable T writeable) throws IOException {
if (writeable != null) {
writeBoolean(true);
writeable.writeTo(this);
writer.write(this, writeable);
} else {
writeBoolean(false);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.core.common.io.stream;

import java.io.IOException;

/**
* Provides a method for serialization which will give ordered stream, creating same byte array on every invocation.
* This should be invoked with a stream that provides ordered serialization.
*/
public interface VerifiableWriteable extends Writeable {

void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.upgrades;

import com.sun.jna.StringArray;
import org.opensearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus;
Expand All @@ -44,6 +45,7 @@
import org.opensearch.client.RestClient;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.common.xcontent.json.JsonXContent;
Expand Down Expand Up @@ -141,14 +143,14 @@ public void testCreateAndRestoreSnapshot() throws IOException {
case STEP2_NEW_CLUSTER:
case STEP4_NEW_CLUSTER:
assertSnapshotStatusSuccessful(client, repoName,
snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new));
snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new), Strings.EMPTY_ARRAY);
break;
case STEP1_OLD_CLUSTER:
assertSnapshotStatusSuccessful(client, repoName, "snapshot-" + TEST_STEP);
assertSnapshotStatusSuccessful(client, repoName, new String[] {"snapshot-" + TEST_STEP}, Strings.EMPTY_ARRAY);
break;
case STEP3_OLD_CLUSTER:
assertSnapshotStatusSuccessful(
client, repoName, "snapshot-" + TEST_STEP, "snapshot-" + TestStep.STEP3_OLD_CLUSTER);
client, repoName, new String[] {"snapshot-" + TEST_STEP, "snapshot-" + TestStep.STEP3_OLD_CLUSTER}, Strings.EMPTY_ARRAY);
break;
}
if (TEST_STEP == TestStep.STEP3_OLD_CLUSTER) {
Expand Down Expand Up @@ -186,10 +188,10 @@ public void testReadOnlyRepo() throws IOException {
break;
}
if (TEST_STEP == TestStep.STEP1_OLD_CLUSTER || TEST_STEP == TestStep.STEP3_OLD_CLUSTER) {
assertSnapshotStatusSuccessful(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER);
assertSnapshotStatusSuccessful(client, repoName, new String[] {"snapshot-" + TestStep.STEP1_OLD_CLUSTER}, Strings.EMPTY_ARRAY);
} else {
assertSnapshotStatusSuccessful(client, repoName,
"snapshot-" + TestStep.STEP1_OLD_CLUSTER, "snapshot-" + TestStep.STEP2_NEW_CLUSTER);
new String[] {"snapshot-" + TestStep.STEP1_OLD_CLUSTER, "snapshot-" + TestStep.STEP2_NEW_CLUSTER}, Strings.EMPTY_ARRAY);
}
if (TEST_STEP == TestStep.STEP3_OLD_CLUSTER) {
ensureSnapshotRestoreWorks(repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
Expand All @@ -214,7 +216,7 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
// Every step creates one snapshot
assertThat(snapshots, hasSize(TEST_STEP.ordinal() + 1));
assertSnapshotStatusSuccessful(client, repoName,
snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new));
snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new), Strings.EMPTY_ARRAY);
if (TEST_STEP == TestStep.STEP1_OLD_CLUSTER) {
ensureSnapshotRestoreWorks(repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
} else {
Expand All @@ -239,9 +241,9 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
}

private static void assertSnapshotStatusSuccessful(RestHighLevelClient client, String repoName,
String... snapshots) throws IOException {
String[] snapshots, String[] indices) throws IOException {
final SnapshotsStatusResponse statusResponse = client.snapshot()
.status(new SnapshotsStatusRequest(repoName, snapshots), RequestOptions.DEFAULT);
.status(new SnapshotsStatusRequest(repoName, snapshots, indices), RequestOptions.DEFAULT);
for (SnapshotStatus status : statusResponse.getSnapshots()) {
assertThat(status.getShardsStats().getFailedShards(), is(0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@
"description":"A comma-separated list of snapshot names"
}
}
},
{
"path":"/_snapshot/{repository}/{snapshot}/{index}/_status",
"methods":[
"GET"
],
"parts":{
"repository":{
"type":"string",
"description":"A repository name"
},
"snapshot":{
"type":"string",
"description":"A snapshot name"
},
"index":{
"type": "list",
"description":"A comma-separated list of index names"
}
}
}
]
},
Expand All @@ -58,7 +78,7 @@
},
"ignore_unavailable":{
"type":"boolean",
"description":"Whether to ignore unavailable snapshots, defaults to false which means a SnapshotMissingException is thrown"
"description":"Whether to ignore unavailable snapshots and indices, defaults to false which means a SnapshotMissingException or IndexNotFoundException is thrown"
}
}
}
Expand Down
Loading

0 comments on commit 10b083a

Please sign in to comment.