Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal: Add support for sending cluster state using diffs #9220

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
*/
public class NodesInfoResponse extends NodesOperationResponse<NodeInfo> implements ToXContent {

private SettingsFilter settingsFilter;

public NodesInfoResponse() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why this was removed?

}

Expand All @@ -64,11 +62,6 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

public NodesInfoResponse settingsFilter(SettingsFilter settingsFilter) {
this.settingsFilter = settingsFilter;
return this;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("cluster_name", getClusterName().value(), XContentBuilder.FieldCaseConversion.NONE);
Expand Down Expand Up @@ -102,7 +95,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

if (nodeInfo.getSettings() != null) {
builder.startObject("settings");
Settings settings = settingsFilter != null ? settingsFilter.filterSettings(nodeInfo.getSettings()) : nodeInfo.getSettings();
Settings settings = nodeInfo.getSettings();
settings.toXContent(builder, params);
builder.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public RoutingExplanations getExplanations() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
state = ClusterState.Builder.readFrom(in, null, null);
state = ClusterState.Builder.readFrom(in, null);
readAcknowledged(in);
explanations = RoutingExplanations.readFrom(in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public ClusterName getClusterName() {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = ClusterName.readClusterName(in);
clusterState = ClusterState.Builder.readFrom(in, null, clusterName);
clusterState = ClusterState.Builder.readFrom(in, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it's more intuitive to have a readFrom(StreamInput input) that delegates than passing null here?

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStatePart;
import org.elasticsearch.cluster.ClusterStatePart.Factory;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaData.Custom;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -41,8 +42,6 @@

import java.util.List;

import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.cluster.metadata.MetaData.lookupFactorySafe;

/**
*
Expand Down Expand Up @@ -127,10 +126,10 @@ protected void masterOperation(final ClusterStateRequest request, final ClusterS
}

// Filter our metadata that shouldn't be returned by API
for(ObjectCursor<String> type : currentState.metaData().customs().keys()) {
Custom.Factory factory = lookupFactorySafe(type.value);
if(!factory.context().contains(MetaData.XContentContext.API)) {
mdBuilder.removeCustom(type.value);
for(ObjectObjectCursor<String, ClusterStatePart> cursor : currentState.metaData().customs()) {
Factory<ClusterStatePart> factory = MetaData.FACTORY.lookupFactorySafe(cursor.key);
if(!factory.context().contains(ClusterStatePart.XContentContext.API)) {
mdBuilder.removeCustom(cursor.key);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.metadata.IndexClusterStatePart;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -51,7 +52,7 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ

private final Set<Alias> aliases = Sets.newHashSet();

private final Map<String, IndexMetaData.Custom> customs = newHashMap();
private final Map<String, IndexClusterStatePart> customs = newHashMap();

private final Set<ClusterBlock> blocks = Sets.newHashSet();

Expand All @@ -77,7 +78,7 @@ public CreateIndexClusterStateUpdateRequest aliases(Set<Alias> aliases) {
return this;
}

public CreateIndexClusterStateUpdateRequest customs(Map<String, IndexMetaData.Custom> customs) {
public CreateIndexClusterStateUpdateRequest customs(Map<String, IndexClusterStatePart> customs) {
this.customs.putAll(customs);
return this;
}
Expand Down Expand Up @@ -120,7 +121,7 @@ public Set<Alias> aliases() {
return aliases;
}

public Map<String, IndexMetaData.Custom> customs() {
public Map<String, IndexClusterStatePart> customs() {
return customs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.metadata.IndexClusterStatePart;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -73,7 +73,7 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>

private final Set<Alias> aliases = Sets.newHashSet();

private final Map<String, IndexMetaData.Custom> customs = newHashMap();
private final Map<String, IndexClusterStatePart> customs = newHashMap();

CreateIndexRequest() {
}
Expand Down Expand Up @@ -406,11 +406,11 @@ public CreateIndexRequest source(Map<String, Object> source) {
aliases((Map<String, Object>) entry.getValue());
} else {
// maybe custom?
IndexMetaData.Custom.Factory factory = IndexMetaData.lookupFactory(name);
IndexClusterStatePart.Factory<IndexClusterStatePart> factory = IndexMetaData.FACTORY.lookupFactory(name);
if (factory != null) {
found = true;
try {
customs.put(name, factory.fromMap((Map<String, Object>) entry.getValue()));
customs.put(name, factory.fromMap((Map<String, Object>) entry.getValue(), null));
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse custom metadata for [" + name + "]");
}
Expand All @@ -435,12 +435,12 @@ Set<Alias> aliases() {
/**
* Adds custom metadata to the index to be created.
*/
public CreateIndexRequest custom(IndexMetaData.Custom custom) {
customs.put(custom.type(), custom);
public CreateIndexRequest custom(IndexClusterStatePart custom) {
customs.put(custom.partType(), custom);
return this;
}

Map<String, IndexMetaData.Custom> customs() {
Map<String, IndexClusterStatePart> customs() {
return this.customs;
}

Expand All @@ -458,7 +458,7 @@ public void readFrom(StreamInput in) throws IOException {
int customSize = in.readVInt();
for (int i = 0; i < customSize; i++) {
String type = in.readString();
IndexMetaData.Custom customIndexMetaData = IndexMetaData.lookupFactorySafe(type).readFrom(in);
IndexClusterStatePart customIndexMetaData = IndexMetaData.FACTORY.lookupFactorySafe(type).readFrom(in, null);
customs.put(type, customIndexMetaData);
}
int aliasesSize = in.readVInt();
Expand All @@ -480,9 +480,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(entry.getValue());
}
out.writeVInt(customs.size());
for (Map.Entry<String, IndexMetaData.Custom> entry : customs.entrySet()) {
out.writeString(entry.getKey());
IndexMetaData.lookupFactorySafe(entry.getKey()).writeTo(entry.getValue(), out);
for (Map.Entry<String, IndexClusterStatePart> entry : customs.entrySet()) {
IndexClusterStatePart.Factory<IndexClusterStatePart> factory = IndexMetaData.FACTORY.lookupFactorySafe(entry.getKey());
if (factory.addedIn().onOrAfter(out.getVersion())) {
out.writeString(entry.getKey());
factory.writeTo(entry.getValue(), out);
}
}
out.writeVInt(aliases.size());
for (Alias alias : aliases) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexClusterStatePart;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -232,7 +232,7 @@ public CreateIndexRequestBuilder setSource(Map<String, Object> source) {
/**
* Adds custom metadata to the index to be created.
*/
public CreateIndexRequestBuilder addCustom(IndexMetaData.Custom custom) {
public CreateIndexRequestBuilder addCustom(IndexClusterStatePart custom) {
request.custom(custom);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected void masterOperation(GetSettingsRequest request, ClusterState state, A
continue;
}

Settings settings = settingsFilter.filterSettings(indexMetaData.settings());
Settings settings = SettingsFilter.filterSettings(settingsFilter.getPatterns(), indexMetaData.settings());
if (!CollectionUtils.isEmpty(request.names())) {
ImmutableSettings.Builder settingsBuilder = ImmutableSettings.builder();
for (Map.Entry<String, String> entry : settings.getAsMap().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.ClusterStatePart;
import org.elasticsearch.cluster.metadata.IndexClusterStatePart;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -71,7 +73,7 @@ public class PutIndexTemplateRequest extends MasterNodeOperationRequest<PutIndex

private final Set<Alias> aliases = newHashSet();

private Map<String, IndexMetaData.Custom> customs = newHashMap();
private Map<String, IndexClusterStatePart> customs = newHashMap();

PutIndexTemplateRequest() {
}
Expand Down Expand Up @@ -294,10 +296,10 @@ public PutIndexTemplateRequest source(Map templateSource) {
aliases((Map<String, Object>) entry.getValue());
} else {
// maybe custom?
IndexMetaData.Custom.Factory factory = IndexMetaData.lookupFactory(name);
ClusterStatePart.Factory<IndexClusterStatePart> factory = IndexMetaData.FACTORY.lookupFactory(name);
if (factory != null) {
try {
customs.put(name, factory.fromMap((Map<String, Object>) entry.getValue()));
customs.put(name, factory.fromMap((Map<String, Object>) entry.getValue(), null));
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse custom metadata for [" + name + "]");
}
Expand Down Expand Up @@ -347,12 +349,12 @@ public PutIndexTemplateRequest source(BytesReference source) {
}
}

public PutIndexTemplateRequest custom(IndexMetaData.Custom custom) {
customs.put(custom.type(), custom);
public PutIndexTemplateRequest custom(IndexClusterStatePart custom) {
customs.put(custom.partType(), custom);
return this;
}

Map<String, IndexMetaData.Custom> customs() {
Map<String, IndexClusterStatePart> customs() {
return this.customs;
}

Expand Down Expand Up @@ -442,7 +444,7 @@ public void readFrom(StreamInput in) throws IOException {
int customSize = in.readVInt();
for (int i = 0; i < customSize; i++) {
String type = in.readString();
IndexMetaData.Custom customIndexMetaData = IndexMetaData.lookupFactorySafe(type).readFrom(in);
IndexClusterStatePart customIndexMetaData = IndexMetaData.FACTORY.lookupFactorySafe(type).readFrom(in, null);
customs.put(type, customIndexMetaData);
}
int aliasesSize = in.readVInt();
Expand All @@ -466,9 +468,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(entry.getValue());
}
out.writeVInt(customs.size());
for (Map.Entry<String, IndexMetaData.Custom> entry : customs.entrySet()) {
out.writeString(entry.getKey());
IndexMetaData.lookupFactorySafe(entry.getKey()).writeTo(entry.getValue(), out);
for (Map.Entry<String, IndexClusterStatePart> entry : customs.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have seen this loop twice maybe we can have a utility method for this?

IndexClusterStatePart.Factory<IndexClusterStatePart> factory = IndexMetaData.FACTORY.lookupFactorySafe(entry.getKey());
if(factory.addedIn().onOrAfter(out.getVersion())) {
out.writeString(entry.getKey());
factory.writeTo(entry.getValue(), out);
}
}
out.writeVInt(aliases.size());
for (Alias alias : aliases) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public ClusterState execute(ClusterState currentState) {
if (indexMetaData == null) {
throw new IndexMissingException(new Index(index));
}
IndexWarmersMetaData warmers = indexMetaData.custom(IndexWarmersMetaData.TYPE);
IndexWarmersMetaData warmers = indexMetaData.get(IndexWarmersMetaData.TYPE);
if (warmers != null) {
List<IndexWarmersMetaData.Entry> entries = Lists.newArrayList();
for (IndexWarmersMetaData.Entry entry : warmers.entries()) {
Expand Down Expand Up @@ -138,7 +138,7 @@ public ClusterState execute(ClusterState currentState) {
if (indexMetaData == null) {
throw new IndexMissingException(new Index(index));
}
IndexWarmersMetaData warmers = indexMetaData.custom(IndexWarmersMetaData.TYPE);
IndexWarmersMetaData warmers = indexMetaData.get(IndexWarmersMetaData.TYPE);
if (warmers != null) {
for (IndexWarmersMetaData.Entry entry : warmers.entries()) {
for (String warmer : request.names()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public ClusterState execute(ClusterState currentState) {
if (indexMetaData == null) {
throw new IndexMissingException(new Index(index));
}
IndexWarmersMetaData warmers = indexMetaData.custom(IndexWarmersMetaData.TYPE);
IndexWarmersMetaData warmers = indexMetaData.get(IndexWarmersMetaData.TYPE);
if (warmers == null) {
logger.info("[{}] putting warmer [{}]", index, request.name());
warmers = new IndexWarmersMetaData(new IndexWarmersMetaData.Entry(request.name(), request.searchRequest().types(), request.searchRequest().queryCache(), source));
Expand Down
Loading