Skip to content

Commit

Permalink
[Remote Routing Table] Initial commit for index routing table manifest (
Browse files Browse the repository at this point in the history
#13577)

* Initial commit for index routing table manifest

Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
Co-authored-by: Himshikha Gupta <himshikh@amazon.com>
Co-authored-by: Arpit Bandejiya <abandeji@amazon.com>
  • Loading branch information
4 people authored Jun 6, 2024
1 parent 0b2e012 commit 7faae25
Show file tree
Hide file tree
Showing 17 changed files with 543 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@
* GitHub history for details.
*/

package org.opensearch.index.translog;
package org.opensearch.core.common.io.stream;

import org.apache.lucene.store.BufferedChecksum;
import org.apache.lucene.util.BitUtil;
import org.opensearch.core.common.io.stream.FilterStreamInput;
import org.opensearch.core.common.io.stream.StreamInput;

import java.io.EOFException;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@
* GitHub history for details.
*/

package org.opensearch.index.translog;
package org.opensearch.core.common.io.stream;

import org.apache.lucene.store.BufferedChecksum;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.zip.CRC32;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
public static final int CODEC_V0 = 0; // Older codec version, where we haven't introduced codec versions for manifest.
public static final int CODEC_V1 = 1; // In Codec V1 we have introduced global-metadata and codec version in Manifest file.
public static final int CODEC_V2 = 2; // In Codec V2, there are seperate metadata files rather than a single global metadata file.
public static final int CODEC_V3 = 3; // In Codec V3, we introduce index routing-metadata in manifest file.

private static final ParseField CLUSTER_TERM_FIELD = new ParseField("cluster_term");
private static final ParseField STATE_VERSION_FIELD = new ParseField("state_version");
Expand All @@ -58,6 +59,8 @@ public class ClusterMetadataManifest implements Writeable, ToXContentFragment {
private static final ParseField UPLOADED_SETTINGS_METADATA = new ParseField("uploaded_settings_metadata");
private static final ParseField UPLOADED_TEMPLATES_METADATA = new ParseField("uploaded_templates_metadata");
private static final ParseField UPLOADED_CUSTOM_METADATA = new ParseField("uploaded_custom_metadata");
private static final ParseField ROUTING_TABLE_VERSION_FIELD = new ParseField("routing_table_version");
private static final ParseField INDICES_ROUTING_FIELD = new ParseField("indices_routing");

private static ClusterMetadataManifest.Builder manifestV0Builder(Object[] fields) {
return ClusterMetadataManifest.builder()
Expand Down Expand Up @@ -86,6 +89,12 @@ private static ClusterMetadataManifest.Builder manifestV2Builder(Object[] fields
.customMetadataMap(customMetadata(fields));
}

private static ClusterMetadataManifest.Builder manifestV3Builder(Object[] fields) {
return manifestV2Builder(fields).codecVersion(codecVersion(fields))
.routingTableVersion(routingTableVersion(fields))
.indicesRouting(indicesRouting(fields));
}

private static long term(Object[] fields) {
return (long) fields[0];
}
Expand Down Expand Up @@ -151,6 +160,14 @@ private static Map<String, UploadedMetadataAttribute> customMetadata(Object[] fi
return customs.stream().collect(Collectors.toMap(UploadedMetadataAttribute::getAttributeName, Function.identity()));
}

private static long routingTableVersion(Object[] fields) {
return (long) fields[15];
}

private static List<UploadedIndexMetadata> indicesRouting(Object[] fields) {
return (List<UploadedIndexMetadata>) fields[16];
}

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER_V0 = new ConstructingObjectParser<>(
"cluster_metadata_manifest",
fields -> manifestV0Builder(fields).build()
Expand All @@ -166,12 +183,18 @@ private static Map<String, UploadedMetadataAttribute> customMetadata(Object[] fi
fields -> manifestV2Builder(fields).build()
);

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> CURRENT_PARSER = PARSER_V2;
private static final ConstructingObjectParser<ClusterMetadataManifest, Void> PARSER_V3 = new ConstructingObjectParser<>(
"cluster_metadata_manifest",
fields -> manifestV3Builder(fields).build()
);

private static final ConstructingObjectParser<ClusterMetadataManifest, Void> CURRENT_PARSER = PARSER_V3;

static {
declareParser(PARSER_V0, CODEC_V0);
declareParser(PARSER_V1, CODEC_V1);
declareParser(PARSER_V2, CODEC_V2);
declareParser(PARSER_V3, CODEC_V3);
}

private static void declareParser(ConstructingObjectParser<ClusterMetadataManifest, Void> parser, long codec_version) {
Expand Down Expand Up @@ -216,6 +239,14 @@ private static void declareParser(ConstructingObjectParser<ClusterMetadataManife
UPLOADED_CUSTOM_METADATA
);
}
if (codec_version >= CODEC_V3) {
parser.declareLong(ConstructingObjectParser.constructorArg(), ROUTING_TABLE_VERSION_FIELD);
parser.declareObjectArray(
ConstructingObjectParser.constructorArg(),
(p, c) -> UploadedIndexMetadata.fromXContent(p),
INDICES_ROUTING_FIELD
);
}
}

private final int codecVersion;
Expand All @@ -234,6 +265,8 @@ private static void declareParser(ConstructingObjectParser<ClusterMetadataManife
private final boolean committed;
private final String previousClusterUUID;
private final boolean clusterUUIDCommitted;
private final long routingTableVersion;
private final List<UploadedIndexMetadata> indicesRouting;

public List<UploadedIndexMetadata> getIndices() {
return indices;
Expand Down Expand Up @@ -306,6 +339,14 @@ public boolean hasMetadataAttributesFiles() {
|| !uploadedCustomMetadataMap.isEmpty();
}

public long getRoutingTableVersion() {
return routingTableVersion;
}

public List<UploadedIndexMetadata> getIndicesRouting() {
return indicesRouting;
}

public ClusterMetadataManifest(
long clusterTerm,
long version,
Expand All @@ -322,7 +363,9 @@ public ClusterMetadataManifest(
UploadedMetadataAttribute uploadedCoordinationMetadata,
UploadedMetadataAttribute uploadedSettingsMetadata,
UploadedMetadataAttribute uploadedTemplatesMetadata,
Map<String, UploadedMetadataAttribute> uploadedCustomMetadataMap
Map<String, UploadedMetadataAttribute> uploadedCustomMetadataMap,
long routingTableVersion,
List<UploadedIndexMetadata> indicesRouting
) {
this.clusterTerm = clusterTerm;
this.stateVersion = version;
Expand All @@ -336,6 +379,8 @@ public ClusterMetadataManifest(
this.indices = Collections.unmodifiableList(indices);
this.previousClusterUUID = previousClusterUUID;
this.clusterUUIDCommitted = clusterUUIDCommitted;
this.routingTableVersion = routingTableVersion;
this.indicesRouting = Collections.unmodifiableList(indicesRouting);
this.uploadedCoordinationMetadata = uploadedCoordinationMetadata;
this.uploadedSettingsMetadata = uploadedSettingsMetadata;
this.uploadedTemplatesMetadata = uploadedTemplatesMetadata;
Expand Down Expand Up @@ -364,20 +409,26 @@ public ClusterMetadataManifest(StreamInput in) throws IOException {
in.readMap(StreamInput::readString, UploadedMetadataAttribute::new)
);
this.globalMetadataFileName = null;
this.routingTableVersion = in.readLong();
this.indicesRouting = Collections.unmodifiableList(in.readList(UploadedIndexMetadata::new));
} else if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
this.codecVersion = in.readInt();
this.globalMetadataFileName = in.readString();
this.uploadedCoordinationMetadata = null;
this.uploadedSettingsMetadata = null;
this.uploadedTemplatesMetadata = null;
this.uploadedCustomMetadataMap = null;
this.routingTableVersion = -1;
this.indicesRouting = null;
} else {
this.codecVersion = CODEC_V0; // Default codec
this.globalMetadataFileName = null;
this.uploadedCoordinationMetadata = null;
this.uploadedSettingsMetadata = null;
this.uploadedTemplatesMetadata = null;
this.uploadedCustomMetadataMap = null;
this.routingTableVersion = -1;
this.indicesRouting = null;
}
}

Expand All @@ -401,7 +452,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startArray(INDICES_FIELD.getPreferredName());
{
for (UploadedIndexMetadata uploadedIndexMetadata : indices) {
builder.startObject();
uploadedIndexMetadata.toXContent(builder, params);
builder.endObject();
}
}
builder.endArray();
Expand Down Expand Up @@ -433,6 +486,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(CODEC_VERSION_FIELD.getPreferredName(), getCodecVersion());
builder.field(GLOBAL_METADATA_FIELD.getPreferredName(), getGlobalMetadataFileName());
}
if (onOrAfterCodecVersion(CODEC_V3)) {
builder.field(ROUTING_TABLE_VERSION_FIELD.getPreferredName(), getRoutingTableVersion());
builder.startArray(INDICES_ROUTING_FIELD.getPreferredName());
{
for (UploadedIndexMetadata uploadedIndexMetadata : indicesRouting) {
builder.startObject();
uploadedIndexMetadata.toXContent(builder, params);
builder.endObject();
}
}
builder.endArray();
}
return builder;
}

Expand All @@ -454,6 +519,8 @@ public void writeTo(StreamOutput out) throws IOException {
uploadedSettingsMetadata.writeTo(out);
uploadedTemplatesMetadata.writeTo(out);
out.writeMap(uploadedCustomMetadataMap, StreamOutput::writeString, (o, v) -> v.writeTo(o));
out.writeLong(routingTableVersion);
out.writeCollection(indicesRouting);
} else if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeInt(codecVersion);
out.writeString(globalMetadataFileName);
Expand All @@ -480,7 +547,9 @@ public boolean equals(Object o) {
&& Objects.equals(previousClusterUUID, that.previousClusterUUID)
&& Objects.equals(clusterUUIDCommitted, that.clusterUUIDCommitted)
&& Objects.equals(globalMetadataFileName, that.globalMetadataFileName)
&& Objects.equals(codecVersion, that.codecVersion);
&& Objects.equals(codecVersion, that.codecVersion)
&& Objects.equals(routingTableVersion, that.routingTableVersion)
&& Objects.equals(indicesRouting, that.indicesRouting);
}

@Override
Expand All @@ -497,7 +566,9 @@ public int hashCode() {
nodeId,
committed,
previousClusterUUID,
clusterUUIDCommitted
clusterUUIDCommitted,
routingTableVersion,
indicesRouting
);
}

Expand All @@ -518,6 +589,10 @@ public static ClusterMetadataManifest fromXContentV1(XContentParser parser) thro
return PARSER_V1.parse(parser, null);
}

public static ClusterMetadataManifest fromXContentV2(XContentParser parser) throws IOException {
return PARSER_V2.parse(parser, null);
}

public static ClusterMetadataManifest fromXContent(XContentParser parser) throws IOException {
return CURRENT_PARSER.parse(parser, null);
}
Expand Down Expand Up @@ -545,12 +620,24 @@ public static class Builder {
private String previousClusterUUID;
private boolean committed;
private boolean clusterUUIDCommitted;
private long routingTableVersion;
private List<UploadedIndexMetadata> indicesRouting;

public Builder indices(List<UploadedIndexMetadata> indices) {
this.indices = indices;
return this;
}

public Builder routingTableVersion(long routingTableVersion) {
this.routingTableVersion = routingTableVersion;
return this;
}

public Builder indicesRouting(List<UploadedIndexMetadata> indicesRouting) {
this.indicesRouting = indicesRouting;
return this;
}

public Builder codecVersion(int codecVersion) {
this.codecVersion = codecVersion;
return this;
Expand Down Expand Up @@ -625,6 +712,10 @@ public List<UploadedIndexMetadata> getIndices() {
return indices;
}

public List<UploadedIndexMetadata> getIndicesRouting() {
return indicesRouting;
}

public Builder previousClusterUUID(String previousClusterUUID) {
this.previousClusterUUID = previousClusterUUID;
return this;
Expand All @@ -638,6 +729,7 @@ public Builder clusterUUIDCommitted(boolean clusterUUIDCommitted) {
public Builder() {
indices = new ArrayList<>();
customMetadataMap = new HashMap<>();
indicesRouting = new ArrayList<>();
}

public Builder(ClusterMetadataManifest manifest) {
Expand All @@ -657,6 +749,8 @@ public Builder(ClusterMetadataManifest manifest) {
this.indices = new ArrayList<>(manifest.indices);
this.previousClusterUUID = manifest.previousClusterUUID;
this.clusterUUIDCommitted = manifest.clusterUUIDCommitted;
this.routingTableVersion = manifest.routingTableVersion;
this.indicesRouting = new ArrayList<>(manifest.indicesRouting);
}

public ClusterMetadataManifest build() {
Expand All @@ -676,7 +770,9 @@ public ClusterMetadataManifest build() {
coordinationMetadata,
settingsMetadata,
templatesMetadata,
customMetadataMap
customMetadataMap,
routingTableVersion,
indicesRouting
);
}

Expand Down Expand Up @@ -776,11 +872,9 @@ public String getIndexUUID() {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
return builder.field(INDEX_NAME_FIELD.getPreferredName(), getIndexName())
.field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID())
.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath())
.endObject();
.field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public class RemoteClusterStateService implements Closeable {
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
+ "updated : [{}], custom metadata updated : [{}]";
public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1;
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V2;
public static final int MANIFEST_CURRENT_CODEC_VERSION = ClusterMetadataManifest.CODEC_V3;
public static final int GLOBAL_METADATA_CURRENT_CODEC_VERSION = 2;

// ToXContent Params with gateway mode.
Expand Down Expand Up @@ -806,7 +806,10 @@ private ClusterMetadataManifest uploadManifest(
uploadedCoordinationMetadata,
uploadedSettingsMetadata,
uploadedTemplatesMetadata,
uploadedCustomMetadataMap
uploadedCustomMetadataMap,
clusterState.routingTable().version(),
// TODO: Add actual list of changed indices routing with index routing upload flow.
new ArrayList<>()
);
writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName);
return manifest;
Expand Down
Loading

0 comments on commit 7faae25

Please sign in to comment.