Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear templates before Adding; Use NamedWriteableAwareStreamInput for RemoteCustomMetadata; Correct the check for deciding upload of HashesOfConsistentSettings #14513

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,7 @@ public Builder templates(Map<String, IndexTemplateMetadata> templates) {
}

public Builder templates(TemplatesMetadata templatesMetadata) {
this.templates.clear();
this.templates.putAll(templatesMetadata.getTemplates());
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
&& clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges();
final boolean updateClusterBlocks = isPublicationEnabled && !clusterState.blocks().equals(previousClusterState.blocks());
final boolean updateHashesOfConsistentSettings = isPublicationEnabled
|| Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
&& Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false;
soosinha marked this conversation as resolved.
Show resolved Hide resolved

uploadedMetadataResults = writeMetadataInParallel(
clusterState,
Expand Down Expand Up @@ -476,7 +476,8 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
return manifestDetails;
}

private UploadedMetadataResults writeMetadataInParallel(
// package private for testing
UploadedMetadataResults writeMetadataInParallel(
ClusterState clusterState,
List<IndexMetadata> indexToUpload,
Map<String, IndexMetadata> prevIndexMetadataByName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.compress.Compressor;
Expand Down Expand Up @@ -122,6 +123,8 @@ public UploadedMetadata getUploadedMetadata() {

public static Custom readFrom(StreamInput streamInput, NamedWriteableRegistry namedWriteableRegistry, String customType)
throws IOException {
return namedWriteableRegistry.getReader(Custom.class, customType).read(streamInput);
try (StreamInput in = new NamedWriteableAwareStreamInput(streamInput, namedWriteableRegistry)) {
return namedWriteableRegistry.getReader(Custom.class, customType).read(in);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,33 @@ public void testIsSegmentReplicationDisabled() {
assertFalse(metadata.isSegmentReplicationEnabled(indexName));
}

public void testTemplatesMetadata() {
TemplatesMetadata templatesMetadata1 = TemplatesMetadata.builder()
.put(
IndexTemplateMetadata.builder("template_1")
.patterns(Arrays.asList("bar-*", "foo-*"))
.settings(Settings.builder().put("random_index_setting_" + randomAlphaOfLength(3), randomAlphaOfLength(5)).build())
.build()
)
.build();
Metadata metadata1 = Metadata.builder().templates(templatesMetadata1).build();
assertThat(metadata1.templates(), is(templatesMetadata1.getTemplates()));

TemplatesMetadata templatesMetadata2 = TemplatesMetadata.builder()
.put(
IndexTemplateMetadata.builder("template_2")
.patterns(Arrays.asList("bar-*", "foo-*"))
.settings(Settings.builder().put("random_index_setting_" + randomAlphaOfLength(3), randomAlphaOfLength(5)).build())
.build()
)
.build();

Metadata metadata2 = Metadata.builder(metadata1).templates(templatesMetadata2).build();

assertThat(metadata2.templates(), is(templatesMetadata2.getTemplates()));

}

public static Metadata randomMetadata() {
Metadata.Builder md = Metadata.builder()
.put(buildIndexMetadata("index", "alias", randomBoolean() ? null : randomBoolean()).build(), randomBoolean())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.TemplatesMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService;
Expand Down Expand Up @@ -92,6 +93,7 @@

import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

import static java.util.stream.Collectors.toList;
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
Expand All @@ -111,13 +113,15 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -518,11 +522,13 @@ public void testWriteIncrementalMetadataSuccess() throws IOException {
final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(Collections.emptyList()).build();

remoteClusterStateService.start();
final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata(
final RemoteClusterStateService rcssSpy = Mockito.spy(remoteClusterStateService);
final RemoteClusterStateManifestInfo manifestInfo = rcssSpy.writeIncrementalMetadata(
previousClusterState,
clusterState,
previousManifest
).getClusterMetadataManifest();
);
final ClusterMetadataManifest manifest = manifestInfo.getClusterMetadataManifest();
final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename__2");
final List<UploadedIndexMetadata> indices = List.of(uploadedIndexMetadata);

Expand All @@ -535,6 +541,24 @@ public void testWriteIncrementalMetadataSuccess() throws IOException {
.previousClusterUUID("prev-cluster-uuid")
.build();

Mockito.verify(rcssSpy)
.writeMetadataInParallel(
eq(clusterState),
eq(new ArrayList<IndexMetadata>(clusterState.metadata().indices().values())),
eq(Collections.singletonMap(indices.get(0).getIndexName(), null)),
eq(clusterState.metadata().customs()),
eq(true),
eq(true),
eq(true),
eq(false),
eq(false),
eq(false),
eq(Collections.emptyMap()),
eq(false),
eq(Collections.emptyList())
);

assertThat(manifestInfo.getManifestFileName(), notNullValue());
assertThat(manifest.getIndices().size(), is(1));
assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName()));
assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID()));
Expand All @@ -543,6 +567,95 @@ public void testWriteIncrementalMetadataSuccess() throws IOException {
assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion()));
assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID()));
assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID()));
assertThat(manifest.getHashesOfConsistentSettings(), nullValue());
assertThat(manifest.getDiscoveryNodesMetadata(), nullValue());
assertThat(manifest.getClusterBlocksMetadata(), nullValue());
assertThat(manifest.getClusterStateCustomMap(), anEmptyMap());
assertThat(manifest.getTransientSettingsMetadata(), nullValue());
assertThat(manifest.getTemplatesMetadata(), notNullValue());
assertThat(manifest.getCoordinationMetadata(), notNullValue());
assertThat(manifest.getCustomMetadataMap().size(), is(2));
assertThat(manifest.getIndicesRouting().size(), is(0));
}

public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws IOException {
publicationEnabled = true;
Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, publicationEnabled).build();
FeatureFlags.initializeFeatureFlags(nodeSettings);
remoteClusterStateService = new RemoteClusterStateService(
"test-node-id",
repositoriesServiceSupplier,
settings,
clusterService,
() -> 0L,
threadPool,
List.of(new RemoteIndexPathUploader(threadPool, settings, repositoriesServiceSupplier, clusterSettings)),
writableRegistry()
);
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
mockBlobStoreObjects();
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT)
.metadata(Metadata.builder().coordinationMetadata(coordinationMetadata))
.build();

final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(Collections.emptyList()).build();

remoteClusterStateService.start();
final RemoteClusterStateService rcssSpy = Mockito.spy(remoteClusterStateService);
final RemoteClusterStateManifestInfo manifestInfo = rcssSpy.writeIncrementalMetadata(
previousClusterState,
clusterState,
previousManifest
);
final ClusterMetadataManifest manifest = manifestInfo.getClusterMetadataManifest();
final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename__2");
final List<UploadedIndexMetadata> indices = List.of(uploadedIndexMetadata);

final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder()
.indices(indices)
.clusterTerm(1L)
.stateVersion(1L)
.stateUUID("state-uuid")
.clusterUUID("cluster-uuid")
.previousClusterUUID("prev-cluster-uuid")
.build();

Mockito.verify(rcssSpy)
.writeMetadataInParallel(
eq(clusterState),
eq(new ArrayList<IndexMetadata>(clusterState.metadata().indices().values())),
eq(Collections.singletonMap(indices.get(0).getIndexName(), null)),
eq(clusterState.metadata().customs()),
eq(true),
eq(true),
eq(true),
eq(true),
eq(false),
eq(false),
eq(Collections.emptyMap()),
eq(true),
Mockito.anyList()
);

assertThat(manifestInfo.getManifestFileName(), notNullValue());
assertThat(manifest.getIndices().size(), is(1));
assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName()));
assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID()));
assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue());
assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm()));
assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion()));
assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID()));
assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID()));
assertThat(manifest.getHashesOfConsistentSettings(), notNullValue());
assertThat(manifest.getDiscoveryNodesMetadata(), notNullValue());
assertThat(manifest.getClusterBlocksMetadata(), nullValue());
assertThat(manifest.getClusterStateCustomMap(), anEmptyMap());
assertThat(manifest.getTransientSettingsMetadata(), nullValue());
assertThat(manifest.getTemplatesMetadata(), notNullValue());
assertThat(manifest.getCoordinationMetadata(), notNullValue());
assertThat(manifest.getCustomMetadataMap().size(), is(2));
assertThat(manifest.getIndicesRouting().size(), is(1));
}

/*
Expand Down Expand Up @@ -2012,7 +2125,9 @@ static ClusterState.Builder generateClusterStateWithOneIndex() {
.build();
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
final Settings settings = Settings.builder().put("mock-settings", true).build();
final TemplatesMetadata templatesMetadata = TemplatesMetadata.EMPTY_METADATA;
final TemplatesMetadata templatesMetadata = TemplatesMetadata.builder()
.put(IndexTemplateMetadata.builder("template1").settings(idxSettings).patterns(List.of("test*")).build())
.build();
final CustomMetadata1 customMetadata1 = new CustomMetadata1("custom-metadata-1");
return ClusterState.builder(ClusterName.DEFAULT)
.version(1L)
Expand All @@ -2025,14 +2140,16 @@ static ClusterState.Builder generateClusterStateWithOneIndex() {
.coordinationMetadata(coordinationMetadata)
.persistentSettings(settings)
.templates(templatesMetadata)
.hashesOfConsistentSettings(Map.of("key1", "value1", "key2", "value2"))
.putCustom(customMetadata1.getWriteableName(), customMetadata1)
.build()
)
.routingTable(RoutingTable.builder().addAsNew(indexMetadata).version(1L).build());
}

static DiscoveryNodes nodesWithLocalNodeClusterManager() {
return DiscoveryNodes.builder().clusterManagerNodeId("cluster-manager-id").localNodeId("cluster-manager-id").build();
final DiscoveryNode localNode = new DiscoveryNode("cluster-manager-id", buildNewFakeTransportAddress(), Version.CURRENT);
return DiscoveryNodes.builder().clusterManagerNodeId("cluster-manager-id").localNodeId("cluster-manager-id").add(localNode).build();
}

private static class CustomMetadata1 extends TestCustomMetadata {
Expand Down
Loading
Loading