diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java
index e3dedfe6252ec..55e3962aeebc3 100644
--- a/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java
+++ b/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java
@@ -89,6 +89,7 @@ protected void masterOperation(final ClusterStateRequest request, final ClusterS
logger.trace("Serving cluster state request using version {}", currentState.version());
ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName());
builder.version(currentState.version());
+ builder.uuid(currentState.uuid());
if (request.nodes()) {
builder.nodes(currentState.nodes());
}
diff --git a/src/main/java/org/elasticsearch/cluster/ClusterState.java b/src/main/java/org/elasticsearch/cluster/ClusterState.java
index c7c3b3da3c3ed..00df87685c2f3 100644
--- a/src/main/java/org/elasticsearch/cluster/ClusterState.java
+++ b/src/main/java/org/elasticsearch/cluster/ClusterState.java
@@ -23,7 +23,7 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
-import org.elasticsearch.Version;
+import org.elasticsearch.cluster.DiffableUtils.KeyedReader;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -77,7 +77,7 @@ public byte id() {
}
}
- public interface Custom {
+ public interface Custom extends Diffable {
interface Factory {
@@ -118,6 +118,8 @@ public static Custom.Factory lookupFactorySafe(String type
private final long version;
+ private final String uuid;
+
private final RoutingTable routingTable;
private final DiscoveryNodes nodes;
@@ -135,12 +137,13 @@ public static Custom.Factory lookupFactorySafe(String type
private volatile ClusterStateStatus status;
- public ClusterState(long version, ClusterState state) {
- this(state.clusterName, version, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.customs());
+ public ClusterState(long version, String uuid, ClusterState state) {
+ this(state.clusterName, version, uuid, state.metaData(), state.routingTable(), state.nodes(), state.blocks(), state.customs());
}
- public ClusterState(ClusterName clusterName, long version, MetaData metaData, RoutingTable routingTable, DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap customs) {
+ public ClusterState(ClusterName clusterName, long version, String uuid, MetaData metaData, RoutingTable routingTable, DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap customs) {
this.version = version;
+ this.uuid = uuid;
this.clusterName = clusterName;
this.metaData = metaData;
this.routingTable = routingTable;
@@ -167,6 +170,14 @@ public long getVersion() {
return version();
}
+ public String uuid() {
+ return this.uuid;
+ }
+
+ public String getUuid() {
+ return uuid();
+ }
+
public DiscoveryNodes nodes() {
return this.nodes;
}
@@ -234,6 +245,7 @@ public RoutingNodes readOnlyRoutingNodes() {
public String prettyPrint() {
StringBuilder sb = new StringBuilder();
sb.append("version: ").append(version).append("\n");
+ sb.append("uuid: ").append(uuid).append("\n");
sb.append("meta data version: ").append(metaData.version()).append("\n");
sb.append(nodes().prettyPrint());
sb.append(routingTable().prettyPrint());
@@ -304,14 +316,13 @@ public String toString() {
}
}
-
-
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
EnumSet metrics = Metric.parseString(params.param("metric", "_all"), true);
if (metrics.contains(Metric.VERSION)) {
builder.field("version", version);
+ builder.field("uuid", uuid);
}
if (metrics.contains(Metric.MASTER_NODE)) {
@@ -508,6 +519,7 @@ public static class Builder {
private final ClusterName clusterName;
private long version = 0;
+ private String uuid = "_na_";
private MetaData metaData = MetaData.EMPTY_META_DATA;
private RoutingTable routingTable = RoutingTable.EMPTY_ROUTING_TABLE;
private DiscoveryNodes nodes = DiscoveryNodes.EMPTY_NODES;
@@ -518,6 +530,7 @@ public static class Builder {
public Builder(ClusterState state) {
this.clusterName = state.clusterName;
this.version = state.version();
+ this.uuid = state.uuid();
this.nodes = state.nodes();
this.routingTable = state.routingTable();
this.metaData = state.metaData();
@@ -576,6 +589,16 @@ public Builder version(long version) {
return this;
}
+ public Builder uuid(String uuid) {
+ this.uuid = uuid;
+ return this;
+ }
+
+ public Builder randomUuid() {
+ this.uuid = Strings.randomBase64UUID();
+ return this;
+ }
+
public Custom getCustom(String type) {
return customs.get(type);
}
@@ -591,7 +614,7 @@ public Builder removeCustom(String type) {
}
public ClusterState build() {
- return new ClusterState(clusterName, version, metaData, routingTable, nodes, blocks, customs.build());
+ return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build());
}
public static byte[] toBytes(ClusterState state) throws IOException {
@@ -600,6 +623,13 @@ public static byte[] toBytes(ClusterState state) throws IOException {
return os.bytes().toBytes();
}
+ public static byte[] toDiffBytes(ClusterState previousState, ClusterState state) throws IOException {
+ Diff diff = state.diffs(previousState);
+ BytesStreamOutput os = new BytesStreamOutput();
+ diff.writeTo(os);
+ return os.bytes().toBytes();
+ }
+
/**
* @param data input bytes
* @param localNode used to set the local node in the cluster state.
@@ -611,6 +641,7 @@ public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throw
public static void writeTo(ClusterState state, StreamOutput out) throws IOException {
state.clusterName.writeTo(out);
out.writeLong(state.version());
+ out.writeString(state.uuid());
MetaData.Builder.writeTo(state.metaData(), out);
RoutingTable.Builder.writeTo(state.routingTable(), out);
DiscoveryNodes.Builder.writeTo(state.nodes(), out);
@@ -630,6 +661,7 @@ public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode loca
ClusterName clusterName = ClusterName.readClusterName(in);
Builder builder = new Builder(clusterName);
builder.version = in.readLong();
+ builder.uuid = in.readString();
builder.metaData = MetaData.Builder.readFrom(in);
builder.routingTable = RoutingTable.Builder.readFrom(in);
builder.nodes = DiscoveryNodes.Builder.readFrom(in, localNode);
@@ -642,5 +674,84 @@ public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode loca
}
return builder.build();
}
+
+ }
+
+ public Diff diffs(ClusterState previousState) {
+ return new ClusterStateDiff(previousState, this);
+ }
+
+
+ private class ClusterStateDiff implements Diff {
+
+ private final long version;
+
+ private final String previousUuid;
+
+ private final String uuid;
+
+ private final ClusterName clusterName;
+
+ private final Diff routingTable;
+
+ private final Diff nodes;
+
+ private final Diff metaData;
+
+ private final Diff blocks;
+
+ private final Diff customs;
+
+ public ClusterStateDiff(ClusterState before, ClusterState after) {
+ previousUuid = before.uuid;
+ uuid = after.uuid;
+ version = after.version;
+ clusterName = after.clusterName;
+ routingTable = after.routingTable.diffs(before.routingTable);
+ nodes = after.nodes.diffs(before.nodes);
+ metaData = after.metaData.diffs(before.metaData);
+ blocks = after.blocks.diffs(before.blocks);
+ customs = DiffableUtils.diff(before.customs, after.customs);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ clusterName.writeTo(out);
+ out.writeString(previousUuid);
+ out.writeString(uuid);
+ out.writeLong(version);
+ routingTable.writeTo(out);
+ nodes.writeTo(out);
+ metaData.writeTo(out);
+ blocks.writeTo(out);
+ customs.writeTo(out);
+ }
+ }
+
+ public ClusterState readDiffs(StreamInput in, DiscoveryNode localNode) throws IOException {
+ Builder builder = new Builder(ClusterName.readClusterName(in));
+ String previousUuid = in.readString();
+ String uuid = in.readString();
+ long version = in.readLong();
+ if (uuid.equals(this.uuid)) {
+ // no need to read the rest - cluster state didn't change
+ return this;
+ }
+ if (previousUuid.equals(this.uuid) == false) {
+ throw new IncompatibleClusterStateVersionException("Expected diffs for version " + version() + " with uuid " + this.uuid + " got version " + version + " and uuid " + previousUuid);
+ }
+ builder.uuid(uuid);
+ builder.version(version);
+ builder.routingTable(routingTable.readDiffs(in));
+ builder.nodes(nodes.readDiffs(in, localNode));
+ builder.metaData(metaData.readDiffs(in));
+ builder.blocks(blocks.readDiffs(in));
+ DiffableUtils.readDiff(customs, in, new KeyedReader() {
+ @Override
+ public Custom readFrom(StreamInput in, String key) throws IOException {
+ return lookupFactorySafe(key).readFrom(in);
+ }
+ });
+ return builder.build();
}
}
diff --git a/src/main/java/org/elasticsearch/cluster/Diff.java b/src/main/java/org/elasticsearch/cluster/Diff.java
new file mode 100644
index 0000000000000..5e5c6ea0897e9
--- /dev/null
+++ b/src/main/java/org/elasticsearch/cluster/Diff.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster;
+
+import org.elasticsearch.common.io.stream.StreamOutput;
+
+import java.io.IOException;
+
+/**
+ * Represents difference between states of cluster state parts
+ */
+public interface Diff {
+
+ /**
+ * Writes the differences into the output stream
+ * @param out
+ * @throws IOException
+ */
+ void writeTo(StreamOutput out) throws IOException;
+}
diff --git a/src/main/java/org/elasticsearch/cluster/Diffable.java b/src/main/java/org/elasticsearch/cluster/Diffable.java
new file mode 100644
index 0000000000000..5904a95165b55
--- /dev/null
+++ b/src/main/java/org/elasticsearch/cluster/Diffable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster;
+
+import org.elasticsearch.common.io.stream.ImmutableStreamable;
+import org.elasticsearch.common.io.stream.StreamInput;
+
+import java.io.IOException;
+
+/**
+ * Cluster state part, changes in which can be serialized
+ */
+public interface Diffable extends ImmutableStreamable {
+
+ /**
+ * Returns serializable object representing differences between this and previousState
+ */
+ Diff diffs(T previousState);
+
+ /**
+ * Reads the {@link org.elasticsearch.cluster.Diff} from StreamInput and applies them to this
+ */
+ T readDiffs(StreamInput in) throws IOException;
+
+}
diff --git a/src/main/java/org/elasticsearch/cluster/DiffableUtils.java b/src/main/java/org/elasticsearch/cluster/DiffableUtils.java
new file mode 100644
index 0000000000000..9027c2107cf55
--- /dev/null
+++ b/src/main/java/org/elasticsearch/cluster/DiffableUtils.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster;
+
+import com.carrotsearch.hppc.cursors.ObjectCursor;
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
+import com.google.common.collect.ImmutableMap;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Maps.newHashMap;
+
+public final class DiffableUtils {
+ private DiffableUtils() {
+ }
+
+ /**
+ * Calculates diffs between two ImmutableOpenMaps of Diffable objects
+ */
+ public static > Diff diff(ImmutableOpenMap before, ImmutableOpenMap after) {
+ assert after != null && before != null;
+ Map diffs = newHashMap();
+ List deletes = newArrayList();
+ Map adds = newHashMap();
+ for (ObjectCursor key : before.keys()) {
+ if (!after.containsKey(key.value)) {
+ deletes.add(key.value);
+ }
+ }
+ for (ObjectObjectCursor partIter : after) {
+ T beforePart = before.get(partIter.key);
+ if (!partIter.value.equals(beforePart)) {
+ if (beforePart == null) {
+ adds.put(partIter.key, partIter.value);
+ } else {
+ diffs.put(partIter.key, partIter.value.diffs(beforePart));
+ }
+ }
+ }
+ return new CompositeDiff<>(deletes, diffs, adds);
+ }
+
+ /**
+ * Calculates diffs between two ImmutableMaps of Diffable objects
+ */
+ public static > Diff diff(ImmutableMap before, ImmutableMap after) {
+ assert after != null && before != null;
+ Map diffs = newHashMap();
+ List deletes = newArrayList();
+ Map adds = newHashMap();
+ for (String key : before.keySet()) {
+ if (!after.containsKey(key)) {
+ deletes.add(key);
+ }
+ }
+ for (ImmutableMap.Entry partIter : after.entrySet()) {
+ T beforePart = before.get(partIter.getKey());
+ if (!partIter.getValue().equals(beforePart)) {
+ if (beforePart == null) {
+ adds.put(partIter.getKey(), partIter.getValue());
+ } else {
+ diffs.put(partIter.getKey(), partIter.getValue().diffs(beforePart));
+ }
+ }
+ }
+ return new CompositeDiff<>(deletes, diffs, adds);
+ }
+
+
+ /**
+ * Reads Diff object produced by {@link org.elasticsearch.cluster.DiffableUtils#diff(org.elasticsearch.common.collect.ImmutableOpenMap, org.elasticsearch.common.collect.ImmutableOpenMap)} method
+ *
+ * The diffs are applied to the before object.
+ */
+ public static > ImmutableOpenMap readDiff(ImmutableOpenMap before, StreamInput in, final T proto) throws IOException {
+ return readDiff(before, in, new KeyedReader() {
+ @Override
+ public T readFrom(StreamInput in, String key) throws IOException {
+ return proto.readFrom(in);
+ }
+ });
+ }
+
+ /**
+ * Reads Diff object produced by {@link org.elasticsearch.cluster.DiffableUtils#diff(org.elasticsearch.common.collect.ImmutableOpenMap, org.elasticsearch.common.collect.ImmutableOpenMap)} method
+ *
+ * The diffs are applied to the before object.
+ */
+ public static > ImmutableOpenMap readDiff(ImmutableOpenMap before, StreamInput in, KeyedReader keyedReader) throws IOException {
+
+ ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder();
+ builder.putAll(before);
+
+ int deletesSize = in.readVInt();
+ for (int i = 0; i < deletesSize; i++) {
+ builder.remove(in.readString());
+ }
+
+ int diffsSize = in.readVInt();
+ for (int i = 0; i < diffsSize; i++) {
+ String key = in.readString();
+ builder.put(key, builder.get(key).readDiffs(in));
+ }
+
+ int addsSize = in.readVInt();
+ for (int i = 0; i < addsSize; i++) {
+ String key = in.readString();
+ builder.put(key, keyedReader.readFrom(in, key));
+ }
+ return builder.build();
+ }
+
+ /**
+ * A reader that can deserialize an object
+ * @param
+ */
+ public static interface KeyedReader {
+ T readFrom(StreamInput in, String key) throws IOException;
+ }
+
+ public static > ImmutableMap readDiff(ImmutableMap before, StreamInput in, final T proto) throws IOException {
+ HashMap builder = newHashMap();
+ builder.putAll(before);
+
+ int deletesSize = in.readVInt();
+ for (int i = 0; i < deletesSize; i++) {
+ builder.remove(in.readString());
+ }
+
+ int diffsSize = in.readVInt();
+ for (int i = 0; i < diffsSize; i++) {
+ String key = in.readString();
+ builder.put(key, builder.get(key).readDiffs(in));
+ }
+
+ int addsSize = in.readVInt();
+ for (int i = 0; i < addsSize; i++) {
+ String key = in.readString();
+ builder.put(key, proto.readFrom(in));
+ }
+ return ImmutableMap.copyOf(builder);
+ }
+
+ /**
+ * Represents differences between two maps of diffable objects
+ * @param the diffable object
+ */
+ private static class CompositeDiff> implements Diff {
+
+ private List deletes;
+ private Map diffs;
+ private Map adds;
+
+ public CompositeDiff(List deletes, Map diffs, Map adds) {
+ this.diffs = diffs;
+ this.deletes = deletes;
+ this.adds = adds;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeVInt(deletes.size());
+ for (String delete : deletes) {
+ out.writeString(delete);
+ }
+
+ out.writeVInt(diffs.size());
+ for (Map.Entry entry : diffs.entrySet()) {
+ out.writeString(entry.getKey());
+ entry.getValue().writeTo(out);
+ }
+
+ out.writeVInt(adds.size());
+ for (Map.Entry entry : adds.entrySet()) {
+ out.writeString(entry.getKey());
+ entry.getValue().writeTo(out);
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/elasticsearch/cluster/IncompatibleClusterStateVersionException.java b/src/main/java/org/elasticsearch/cluster/IncompatibleClusterStateVersionException.java
new file mode 100644
index 0000000000000..9c286a2db89a5
--- /dev/null
+++ b/src/main/java/org/elasticsearch/cluster/IncompatibleClusterStateVersionException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster;
+
+import org.elasticsearch.ElasticsearchException;
+
+/**
+ * Thrown by {@link org.elasticsearch.cluster.Diffable#readDiffs(org.elasticsearch.common.io.stream.StreamInput, org.elasticsearch.cluster.node.DiscoveryNode)} method
+ */
+public class IncompatibleClusterStateVersionException extends ElasticsearchException {
+ public IncompatibleClusterStateVersionException(String msg) {
+ super(msg);
+ }
+}
diff --git a/src/main/java/org/elasticsearch/cluster/SimpleDiffable.java b/src/main/java/org/elasticsearch/cluster/SimpleDiffable.java
new file mode 100644
index 0000000000000..684a06982a212
--- /dev/null
+++ b/src/main/java/org/elasticsearch/cluster/SimpleDiffable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster;
+
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+
+import java.io.IOException;
+
+public abstract class SimpleDiffable implements Diffable {
+
+ @Override
+ public Diff diffs(T previousState) {
+ if (this.equals(previousState)) {
+ return new Diff() {
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeBoolean(false);
+ }
+ };
+ } else {
+ return new Diff() {
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeBoolean(true);
+ SimpleDiffable.this.writeTo(out);
+ }
+ };
+ }
+ }
+
+ @Override
+ public T readDiffs(StreamInput in) throws IOException {
+ if(in.readBoolean()) {
+ return readFrom(in);
+ } else {
+ return get();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public T get() {
+ return (T) this;
+ }
+}
+
diff --git a/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java b/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java
index bb7d332de4f96..ba02c4d86cfde 100644
--- a/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java
+++ b/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java
@@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.elasticsearch.cluster.SimpleDiffable;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.io.stream.StreamInput;
@@ -36,7 +37,7 @@
/**
* Represents current cluster level blocks to block dirty operations done against the cluster.
*/
-public class ClusterBlocks {
+public class ClusterBlocks extends SimpleDiffable {
public static final ClusterBlocks EMPTY_CLUSTER_BLOCK = new ClusterBlocks(ImmutableSet.of(), ImmutableMap.>of());
@@ -203,6 +204,16 @@ public ClusterBlockException indicesBlockedException(ClusterBlockLevel level, St
return new ClusterBlockException(builder.build());
}
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ Builder.writeClusterBlocks(this, out);
+ }
+
+ @Override
+ public ClusterBlocks readFrom(StreamInput in) throws IOException {
+ return Builder.readClusterBlocks(in);
+ }
+
static class ImmutableLevelHolder {
static final ImmutableLevelHolder EMPTY = new ImmutableLevelHolder(ImmutableSet.of(), ImmutableMap.>of());
diff --git a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java
index 253b27ddc96df..c91de3012494e 100644
--- a/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java
+++ b/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java
@@ -25,6 +25,7 @@
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
+import org.elasticsearch.cluster.SimpleDiffable;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
@@ -61,8 +62,11 @@
/**
*
*/
-public class IndexMetaData {
+public class IndexMetaData extends SimpleDiffable {
+ public static final IndexMetaData PROTO = IndexMetaData.builder("")
+ .settings(ImmutableSettings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
+ .numberOfShards(1).numberOfReplicas(0).build();
public interface Custom {
@@ -467,6 +471,16 @@ public int hashCode() {
return result;
}
+ @Override
+ public IndexMetaData readFrom(StreamInput in) throws IOException {
+ return Builder.readFrom(in);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ Builder.writeTo(this, out);
+ }
+
public static Builder builder(String index) {
return new Builder(index);
}
diff --git a/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java
index 6198f35cd52cf..8501afb6bad04 100644
--- a/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java
+++ b/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java
@@ -22,6 +22,7 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.Sets;
import org.elasticsearch.Version;
+import org.elasticsearch.cluster.SimpleDiffable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.compress.CompressedString;
@@ -42,7 +43,9 @@
/**
*
*/
-public class IndexTemplateMetaData {
+public class IndexTemplateMetaData extends SimpleDiffable {
+
+ public static final IndexTemplateMetaData PROTO = IndexTemplateMetaData.builder("").build();
private final String name;
@@ -161,6 +164,16 @@ public int hashCode() {
return result;
}
+ @Override
+ public IndexTemplateMetaData readFrom(StreamInput in) throws IOException {
+ return Builder.readFrom(in);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ Builder.writeTo(this, out);
+ }
+
public static class Builder {
private static final Set VALID_FIELDS = Sets.newHashSet("template", "order", "mappings", "settings");
diff --git a/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
index 85974fe236a7e..4450e937efd38 100644
--- a/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
+++ b/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java
@@ -26,7 +26,9 @@
import com.google.common.base.Predicate;
import com.google.common.collect.*;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.cluster.*;
import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.cluster.DiffableUtils.KeyedReader;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.common.Nullable;
@@ -56,7 +58,7 @@
/**
*
*/
-public class MetaData implements Iterable {
+public class MetaData implements Iterable, Diffable {
public static final String ALL = "_all";
@@ -75,7 +77,7 @@ public enum XContentContext {
public static EnumSet API_AND_GATEWAY = EnumSet.of(XContentContext.API, XContentContext.GATEWAY);
public static EnumSet API_AND_SNAPSHOT = EnumSet.of(XContentContext.API, XContentContext.SNAPSHOT);
- public interface Custom {
+ public interface Custom extends Diffable {
abstract class Factory {
@@ -1146,6 +1148,75 @@ public static boolean isGlobalStateEquals(MetaData metaData1, MetaData metaData2
return true;
}
+ @Override
+ public Diff diffs(MetaData previousState) {
+ return new MetaDataDiff(previousState, this);
+ }
+
+ private class MetaDataDiff implements Diff {
+
+ private final long version;
+
+ private final String uuid;
+
+ private final Settings transientSettings;
+ private final Settings persistentSettings;
+ private final Diff indices;
+ private final Diff templates;
+ private final Diff customs;
+
+
+ public MetaDataDiff(MetaData before, MetaData after) {
+ uuid = after.uuid;
+ version = after.version;
+ transientSettings = after.transientSettings;
+ persistentSettings = after.persistentSettings;
+ indices = DiffableUtils.diff(before.indices, after.indices);
+ templates = DiffableUtils.diff(before.templates, after.templates);
+ customs = DiffableUtils.diff(before.customs, after.customs);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeString(uuid);
+ out.writeLong(version);
+ ImmutableSettings.writeSettingsToStream(transientSettings, out);
+ ImmutableSettings.writeSettingsToStream(persistentSettings, out);
+ indices.writeTo(out);
+ templates.writeTo(out);
+ customs.writeTo(out);
+ }
+ }
+
+ @Override
+ public MetaData readDiffs(StreamInput in) throws IOException {
+ Builder builder = builder();
+ builder.uuid(in.readString());
+ builder.version(in.readLong());
+ builder.transientSettings(ImmutableSettings.readSettingsFromStream(in));
+ builder.persistentSettings(ImmutableSettings.readSettingsFromStream(in));
+ builder.indices(DiffableUtils.readDiff(indices, in, IndexMetaData.PROTO));
+ builder.templates(DiffableUtils.readDiff(templates, in, IndexTemplateMetaData.PROTO));
+ builder.customs(DiffableUtils.readDiff(customs, in, new KeyedReader() {
+ @Override
+ public Custom readFrom(StreamInput in, String key) throws IOException {
+ return lookupFactorySafe(key).readFrom(in);
+ }
+ }));
+ return builder.build();
+ }
+
+
+ @Override
+ public MetaData readFrom(StreamInput in) throws IOException {
+ return Builder.readFrom(in);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ Builder.writeTo(this, out);
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -1217,6 +1288,11 @@ public Builder removeAllIndices() {
return this;
}
+ public Builder indices(ImmutableOpenMap indices) {
+ this.indices.putAll(indices);
+ return this;
+ }
+
public Builder put(IndexTemplateMetaData.Builder template) {
return put(template.build());
}
@@ -1231,6 +1307,11 @@ public Builder removeTemplate(String templateName) {
return this;
}
+ public Builder templates(ImmutableOpenMap templates) {
+ this.templates.putAll(templates);
+ return this;
+ }
+
public Custom getCustom(String type) {
return customs.get(type);
}
@@ -1245,6 +1326,11 @@ public Builder removeCustom(String type) {
return this;
}
+ public Builder customs(ImmutableOpenMap customs) {
+ this.customs.putAll(customs);
+ return this;
+ }
+
public Builder updateSettings(Settings settings, String... indices) {
if (indices == null || indices.length == 0) {
indices = this.indices.keys().toArray(String.class);
@@ -1296,6 +1382,10 @@ public Builder version(long version) {
this.version = version;
return this;
}
+ public Builder uuid(String uuid) {
+ this.uuid = uuid;
+ return this;
+ }
public Builder generateUuidIfNeeded() {
if (uuid.equals("_na_")) {
diff --git a/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java
index 81b11fc14b1b6..c5d7c7b59507c 100644
--- a/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java
+++ b/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java
@@ -21,6 +21,8 @@
import com.google.common.collect.ImmutableList;
import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.cluster.SimpleDiffable;
+import org.elasticsearch.cluster.metadata.MetaData.Custom;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
@@ -39,7 +41,7 @@
/**
* Contains metadata about registered snapshot repositories
*/
-public class RepositoriesMetaData implements MetaData.Custom {
+public class RepositoriesMetaData extends SimpleDiffable implements MetaData.Custom {
public static final String TYPE = "repositories";
@@ -80,6 +82,16 @@ public RepositoryMetaData repository(String name) {
return null;
}
+ @Override
+ public MetaData.Custom readFrom(StreamInput in) throws IOException {
+ return FACTORY.readFrom(in);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ FACTORY.writeTo(this, out);
+ }
+
/**
* Repository metadata factory
*/
diff --git a/src/main/java/org/elasticsearch/cluster/metadata/RestoreMetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/RestoreMetaData.java
index 373d5ff858c9e..8467ff1fd0d43 100644
--- a/src/main/java/org/elasticsearch/cluster/metadata/RestoreMetaData.java
+++ b/src/main/java/org/elasticsearch/cluster/metadata/RestoreMetaData.java
@@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.cluster.SimpleDiffable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
@@ -35,7 +36,7 @@
/**
* Meta data about restore processes that are currently executing
*/
-public class RestoreMetaData implements MetaData.Custom {
+public class RestoreMetaData extends SimpleDiffable implements MetaData.Custom {
public static final String TYPE = "restore";
@@ -394,6 +395,16 @@ public static State fromValue(byte value) {
}
}
+ @Override
+ public MetaData.Custom readFrom(StreamInput in) throws IOException {
+ return FACTORY.readFrom(in);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ FACTORY.writeTo(this, out);
+ }
+
/**
* Restore metadata factory
*/
diff --git a/src/main/java/org/elasticsearch/cluster/metadata/SnapshotMetaData.java b/src/main/java/org/elasticsearch/cluster/metadata/SnapshotMetaData.java
index b759fe5daebaf..31980a7df2cd6 100644
--- a/src/main/java/org/elasticsearch/cluster/metadata/SnapshotMetaData.java
+++ b/src/main/java/org/elasticsearch/cluster/metadata/SnapshotMetaData.java
@@ -22,6 +22,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.cluster.SimpleDiffable;
+import org.elasticsearch.cluster.metadata.MetaData.Custom;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
@@ -38,7 +40,7 @@
/**
* Meta data about snapshots that are currently executing
*/
-public class SnapshotMetaData implements MetaData.Custom {
+public class SnapshotMetaData extends SimpleDiffable implements MetaData.Custom {
public static final String TYPE = "snapshots";
public static final Factory FACTORY = new Factory();
@@ -330,7 +332,16 @@ public Entry snapshot(SnapshotId snapshotId) {
return null;
}
+ @Override
+ public MetaData.Custom readFrom(StreamInput in) throws IOException {
+ return FACTORY.readFrom(in);
+ }
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ FACTORY.writeTo(this, out);
+ }
+
public static class Factory extends MetaData.Custom.Factory {
@Override
diff --git a/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java b/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java
index 0a4986476e5c1..bc0690e87633b 100644
--- a/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java
+++ b/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java
@@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.cluster.Diff;
import org.elasticsearch.Version;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
@@ -568,6 +569,41 @@ public String shortSummary() {
}
}
+ public void writeTo(StreamOutput out) throws IOException {
+ Builder.writeTo(this, out);
+ }
+
+ public DiscoveryNodes readFrom(StreamInput in, DiscoveryNode localNode) throws IOException {
+ return Builder.readFrom(in, localNode);
+ }
+
+ public Diff diffs(DiscoveryNodes previousState) {
+ if (this.equals(previousState)) {
+ return new Diff() {
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeBoolean(false);
+ }
+ };
+ } else {
+ return new Diff() {
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeBoolean(true);
+ DiscoveryNodes.this.writeTo(out);
+ }
+ };
+ }
+ }
+
+ public DiscoveryNodes readDiffs(StreamInput in, DiscoveryNode localNode) throws IOException {
+ if(in.readBoolean()) {
+ return readFrom(in, localNode);
+ } else {
+ return this;
+ }
+ }
+
public static Builder builder() {
return new Builder();
}
diff --git a/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java
index 5f0356d357254..737d401816b8d 100644
--- a/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java
+++ b/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java
@@ -26,6 +26,8 @@
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.ElasticsearchIllegalStateException;
+import org.elasticsearch.cluster.Diffable;
+import org.elasticsearch.cluster.SimpleDiffable;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
@@ -56,7 +58,9 @@
* represented as {@link ShardRouting}.
*
*/
-public class IndexRoutingTable implements Iterable {
+public class IndexRoutingTable extends SimpleDiffable implements Iterable {
+
+ public static final IndexRoutingTable PROTO = builder("").build();
private final String index;
private final ShardShuffler shuffler;
@@ -315,9 +319,39 @@ public GroupShardsIterator groupByAllIt() {
return new GroupShardsIterator(set);
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ IndexRoutingTable that = (IndexRoutingTable) o;
+
+ if (!index.equals(that.index)) return false;
+ if (!shards.equals(that.shards)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = index.hashCode();
+ result = 31 * result + shards.hashCode();
+ return result;
+ }
+
public void validate() throws RoutingValidationException {
}
+ @Override
+ public IndexRoutingTable readFrom(StreamInput in) throws IOException {
+ return Builder.readFrom(in);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ Builder.writeTo(this, out);
+ }
+
public static Builder builder(String index) {
return new Builder(index);
}
diff --git a/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java
index 00e50b76129b9..2371b96f5b0a5 100644
--- a/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java
+++ b/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java
@@ -347,6 +347,28 @@ public ShardIterator preferNodeActiveInitializingShardsIt(String nodeId) {
return new PlainShardIterator(shardId, ordered);
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ IndexShardRoutingTable that = (IndexShardRoutingTable) o;
+
+ if (primaryAllocatedPostApi != that.primaryAllocatedPostApi) return false;
+ if (!shardId.equals(that.shardId)) return false;
+ if (!shards.equals(that.shards)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = shardId.hashCode();
+ result = 31 * result + shards.hashCode();
+ result = 31 * result + (primaryAllocatedPostApi ? 1 : 0);
+ return result;
+ }
+
/**
* Returns true
iff all shards in the routing table are started otherwise false
*/
diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
index 6f44a1d11fc04..2c247e598a9bb 100644
--- a/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
+++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
@@ -21,7 +21,7 @@
import com.carrotsearch.hppc.IntSet;
import com.google.common.collect.*;
-import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.StreamInput;
@@ -44,7 +44,7 @@
*
* @see IndexRoutingTable
*/
-public class RoutingTable implements Iterable {
+public class RoutingTable implements Iterable, Diffable {
public static final RoutingTable EMPTY_ROUTING_TABLE = builder().build();
@@ -287,6 +287,46 @@ public GroupShardsIterator activePrimaryShardsGrouped(String[] indices, boolean
return new GroupShardsIterator(set);
}
+ @Override
+ public Diff diffs(RoutingTable previousState) {
+ return new RoutingTableDiff(previousState, this);
+ }
+
+ @Override
+ public RoutingTable readDiffs(StreamInput in) throws IOException {
+ long version = in.readLong();
+ ImmutableMap indicesRouting = DiffableUtils.readDiff(this.indicesRouting, in, IndexRoutingTable.PROTO);
+ return new RoutingTable(version, indicesRouting);
+ }
+
+ @Override
+ public RoutingTable readFrom(StreamInput in) throws IOException {
+ return Builder.readFrom(in);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ Builder.writeTo(this, out);
+ }
+
+ private class RoutingTableDiff implements Diff {
+
+ private final long version;
+
+ private final Diff indicesRouting;
+
+ public RoutingTableDiff(RoutingTable before, RoutingTable after) {
+ version = after.version;
+ indicesRouting = DiffableUtils.diff(before.indicesRouting, after.indicesRouting);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeLong(version);
+ indicesRouting.writeTo(out);
+ }
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -436,6 +476,11 @@ public Builder add(IndexRoutingTable.Builder indexRoutingTableBuilder) {
return this;
}
+ public Builder indicesRouting(ImmutableMap indicesRouting) {
+ this.indicesRouting.putAll(indicesRouting);
+ return this;
+ }
+
public Builder remove(String index) {
indicesRouting.remove(index);
return this;
@@ -483,5 +528,4 @@ public String prettyPrint() {
return sb.toString();
}
-
}
diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java
index 6eef633050470..0e59b14c00d7b 100644
--- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java
+++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java
@@ -36,6 +36,7 @@
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
@@ -113,7 +114,7 @@ public InternalClusterService(Settings settings, DiscoveryService discoveryServi
this.version = version;
// will be replaced on doStart.
- this.clusterState = ClusterState.builder(clusterName).build();
+ this.clusterState = ClusterState.builder(clusterName).randomUuid().build();
this.nodeSettingsService.setClusterService(this);
@@ -400,7 +401,7 @@ public void run() {
Discovery.AckListener ackListener = new NoOpAckListener();
if (newClusterState.nodes().localNodeMaster()) {
// only the master controls the version numbers
- Builder builder = ClusterState.builder(newClusterState).version(newClusterState.version() + 1);
+ Builder builder = ClusterState.builder(newClusterState).version(newClusterState.version() + 1).randomUuid();
if (previousClusterState.routingTable() != newClusterState.routingTable()) {
builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1));
}
@@ -465,7 +466,7 @@ public void run() {
// we don't want to notify
if (newClusterState.nodes().localNodeMaster()) {
logger.debug("publishing cluster state version {}", newClusterState.version());
- discoveryService.publish(newClusterState, ackListener);
+ discoveryService.publish(clusterChangedEvent, ackListener);
}
// update the current cluster state
@@ -512,7 +513,7 @@ public void run() {
logger.debug("processing [{}]: done applying updated cluster_state (version: {})", source, newClusterState.version());
} catch (Throwable t) {
- StringBuilder sb = new StringBuilder("failed to apply updated cluster state:\nversion [").append(newClusterState.version()).append("], source [").append(source).append("]\n");
+ StringBuilder sb = new StringBuilder("failed to apply updated cluster state:\nversion [").append(newClusterState.version()).append("], uuid [").append(newClusterState.uuid()).append("], source [").append(source).append("]\n");
sb.append(newClusterState.nodes().prettyPrint());
sb.append(newClusterState.routingTable().prettyPrint());
sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint());
diff --git a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java
index 22195c31c12ef..4186658806298 100644
--- a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java
+++ b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java
@@ -96,6 +96,7 @@ public ClusterDynamicSettingsModule() {
clusterDynamicSettings.addDynamicSetting(SnapshotInProgressAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED);
clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME);
clusterDynamicSettings.addDynamicSetting(DiscoverySettings.PUBLISH_TIMEOUT, Validator.TIME_NON_NEGATIVE);
+ clusterDynamicSettings.addDynamicSetting(DiscoverySettings.PUBLISH_DIFF_ENABLE, Validator.BOOLEAN);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, Validator.MEMORY_SIZE);
clusterDynamicSettings.addDynamicSetting(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
diff --git a/src/main/java/org/elasticsearch/common/io/stream/ImmutableStreamable.java b/src/main/java/org/elasticsearch/common/io/stream/ImmutableStreamable.java
new file mode 100644
index 0000000000000..5e1f5faa6db99
--- /dev/null
+++ b/src/main/java/org/elasticsearch/common/io/stream/ImmutableStreamable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.io.stream;
+
+import java.io.IOException;
+
+public interface ImmutableStreamable {
+
+ /**
+ * Reads a copy of an object with the same type form the stream input
+ *
+ * The caller object remains unchanged.
+ */
+ T readFrom(StreamInput in) throws IOException;
+
+ /**
+ * Writes the current object into the output stream out
+ */
+ void writeTo(StreamOutput out) throws IOException;
+}
diff --git a/src/main/java/org/elasticsearch/discovery/Discovery.java b/src/main/java/org/elasticsearch/discovery/Discovery.java
index dfd51e6348f07..36b8e5da6f5f5 100644
--- a/src/main/java/org/elasticsearch/discovery/Discovery.java
+++ b/src/main/java/org/elasticsearch/discovery/Discovery.java
@@ -19,6 +19,7 @@
package org.elasticsearch.discovery;
+import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
@@ -59,7 +60,7 @@ public interface Discovery extends LifecycleComponent {
* The {@link AckListener} allows to keep track of the ack received from nodes, and verify whether
* they updated their own cluster state or not.
*/
- void publish(ClusterState clusterState, AckListener ackListener);
+ void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener);
public static interface AckListener {
void onNodeAck(DiscoveryNode node, @Nullable Throwable t);
diff --git a/src/main/java/org/elasticsearch/discovery/DiscoveryService.java b/src/main/java/org/elasticsearch/discovery/DiscoveryService.java
index f73f2bbb5939b..e6a3668921bea 100644
--- a/src/main/java/org/elasticsearch/discovery/DiscoveryService.java
+++ b/src/main/java/org/elasticsearch/discovery/DiscoveryService.java
@@ -21,6 +21,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -132,9 +133,9 @@ public String nodeDescription() {
* The {@link org.elasticsearch.discovery.Discovery.AckListener} allows to acknowledge the publish
* event based on the response gotten from all nodes
*/
- public void publish(ClusterState clusterState, Discovery.AckListener ackListener) {
+ public void publish(ClusterChangedEvent clusterChangedEvent, Discovery.AckListener ackListener) {
if (lifecycle.started()) {
- discovery.publish(clusterState, ackListener);
+ discovery.publish(clusterChangedEvent, ackListener);
}
}
diff --git a/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java b/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java
index b8d48b16129a9..3ef877f326141 100644
--- a/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java
+++ b/src/main/java/org/elasticsearch/discovery/DiscoverySettings.java
@@ -38,16 +38,19 @@ public class DiscoverySettings extends AbstractComponent {
public static final String PUBLISH_TIMEOUT = "discovery.zen.publish_timeout";
public static final String NO_MASTER_BLOCK = "discovery.zen.no_master_block";
+ public static final String PUBLISH_DIFF_ENABLE = "discovery.zen.publish_diffs.enable";
public static final TimeValue DEFAULT_PUBLISH_TIMEOUT = TimeValue.timeValueSeconds(30);
public static final String DEFAULT_NO_MASTER_BLOCK = "write";
public final static int NO_MASTER_BLOCK_ID = 2;
+ public final static boolean DEFAULT_PUBLISH_DIFF_ENABLE = true;
public final static ClusterBlock NO_MASTER_BLOCK_ALL = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
public final static ClusterBlock NO_MASTER_BLOCK_WRITES = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, false, RestStatus.SERVICE_UNAVAILABLE, EnumSet.of(ClusterBlockLevel.WRITE, ClusterBlockLevel.METADATA));
private volatile ClusterBlock noMasterBlock;
private volatile TimeValue publishTimeout = DEFAULT_PUBLISH_TIMEOUT;
+ private volatile boolean publishDiffs = DEFAULT_PUBLISH_DIFF_ENABLE;
@Inject
public DiscoverySettings(Settings settings, NodeSettingsService nodeSettingsService) {
@@ -55,6 +58,7 @@ public DiscoverySettings(Settings settings, NodeSettingsService nodeSettingsServ
nodeSettingsService.addListener(new ApplySettings());
this.noMasterBlock = parseNoMasterBlock(settings.get(NO_MASTER_BLOCK, DEFAULT_NO_MASTER_BLOCK));
this.publishTimeout = settings.getAsTime(PUBLISH_TIMEOUT, publishTimeout);
+ this.publishDiffs = settings.getAsBoolean(PUBLISH_DIFF_ENABLE, DEFAULT_PUBLISH_DIFF_ENABLE);
}
/**
@@ -68,6 +72,8 @@ public ClusterBlock getNoMasterBlock() {
return noMasterBlock;
}
+ public boolean getPublishDiffs() { return publishDiffs;}
+
private class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
@@ -85,6 +91,13 @@ public void onRefreshSettings(Settings settings) {
noMasterBlock = newNoMasterBlock;
}
}
+ Boolean newPublishDiffs = settings.getAsBoolean(PUBLISH_DIFF_ENABLE, null);
+ if (newPublishDiffs != null) {
+ if (newPublishDiffs != publishDiffs) {
+ logger.info("updating [{}] from [{}] to [{}]", PUBLISH_DIFF_ENABLE, publishDiffs, newPublishDiffs);
+ publishDiffs = newPublishDiffs;
+ }
+ }
}
}
diff --git a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java
index e3f4b17910b56..07a6fb3c3a736 100644
--- a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java
+++ b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java
@@ -33,6 +33,7 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.internal.Nullable;
+import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -46,6 +47,8 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import static com.google.common.collect.Sets.newHashSet;
import static org.elasticsearch.cluster.ClusterState.Builder;
@@ -76,6 +79,11 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem
private static final ConcurrentMap clusterGroups = ConcurrentCollections.newConcurrentMap();
+ private volatile ClusterState lastProcessedClusterState;
+
+ private final Lock lock = new ReentrantLock();
+
+
@Inject
public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) {
@@ -274,7 +282,7 @@ public String nodeDescription() {
}
@Override
- public void publish(ClusterState clusterState, final Discovery.AckListener ackListener) {
+ public void publish(ClusterChangedEvent clusterChangedEvent, final Discovery.AckListener ackListener) {
if (!master) {
throw new ElasticsearchIllegalStateException("Shouldn't publish state when not master");
}
@@ -287,7 +295,7 @@ public void publish(ClusterState clusterState, final Discovery.AckListener ackLi
}
nodesToPublishTo.add(localDiscovery.localNode);
}
- publish(members, clusterState, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener));
+ publish(members, clusterChangedEvent, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener));
}
}
@@ -300,17 +308,44 @@ private LocalDiscovery[] members() {
return members.toArray(new LocalDiscovery[members.size()]);
}
- private void publish(LocalDiscovery[] members, ClusterState clusterState, final BlockingClusterStatePublishResponseHandler publishResponseHandler) {
+ private void publish(LocalDiscovery[] members, ClusterChangedEvent clusterChangedEvent, final BlockingClusterStatePublishResponseHandler publishResponseHandler) {
try {
// we do the marshaling intentionally, to check it works well...
- final byte[] clusterStateBytes = Builder.toBytes(clusterState);
+ byte[] clusterStateBytes = null;
+ byte[] clusterStateDiffBytes = null;
+ ClusterState clusterState = clusterChangedEvent.state();
for (final LocalDiscovery discovery : members) {
if (discovery.master) {
continue;
}
- final ClusterState nodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode);
+ ClusterState newNodeSpecificClusterState = null;
+ discovery.lock.lock();
+ try {
+ // we do the marshaling intentionally, to check it works well...
+ if (discovery.lastProcessedClusterState != null && clusterChangedEvent.previousState().nodes().nodeExists(discovery.localNode.id())) {
+ if (clusterStateDiffBytes == null) {
+ clusterStateDiffBytes = Builder.toDiffBytes(clusterChangedEvent.previousState(), clusterState);
+ }
+ try {
+ newNodeSpecificClusterState = discovery.lastProcessedClusterState.readDiffs(new BytesStreamInput(clusterStateDiffBytes), discovery.localNode);
+ logger.debug("sending diff cluster state version with size {} to [{}]", clusterStateDiffBytes.length, discovery.localNode.getName());
+ } catch (IncompatibleClusterStateVersionException ex) {
+ logger.warn("incompatible cluster state version - resending complete cluster state", ex);
+ }
+ }
+ if (newNodeSpecificClusterState == null) {
+ if (clusterStateBytes == null) {
+ clusterStateBytes = Builder.toBytes(clusterState);
+ }
+ newNodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode);
+ }
+ } finally {
+ discovery.lock.unlock();
+ }
+ final ClusterState nodeSpecificClusterState = newNodeSpecificClusterState;
+
nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
// ignore cluster state messages that do not include "me", not in the game yet...
if (nodeSpecificClusterState.nodes().localNode() != null) {
diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
index d91c7a2f127ad..f6422a800e0ee 100644
--- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
+++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
@@ -331,12 +331,12 @@ public boolean nodeHasJoinedClusterOnce() {
@Override
- public void publish(ClusterState clusterState, AckListener ackListener) {
- if (!clusterState.getNodes().localNodeMaster()) {
+ public void publish(ClusterChangedEvent clusterChangedEvent, AckListener ackListener) {
+ if (!clusterChangedEvent.state().getNodes().localNodeMaster()) {
throw new ElasticsearchIllegalStateException("Shouldn't publish state when not master");
}
- nodesFD.updateNodesAndPing(clusterState);
- publishClusterState.publish(clusterState, ackListener);
+ nodesFD.updateNodesAndPing(clusterChangedEvent.state());
+ publishClusterState.publish(clusterChangedEvent, ackListener);
}
/**
diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java
index e8352f389c579..fc9c6efa6a9e6 100644
--- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java
+++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java
@@ -21,7 +21,10 @@
import com.google.common.collect.Maps;
import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.Diff;
+import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
@@ -40,10 +43,13 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
+import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
*
@@ -83,44 +89,62 @@ public void close() {
transportService.removeHandler(ACTION_NAME);
}
- public void publish(ClusterState clusterState, final Discovery.AckListener ackListener) {
- Set nodesToPublishTo = new HashSet<>(clusterState.nodes().size());
+ public void publish(ClusterChangedEvent clusterChangedEvent, final Discovery.AckListener ackListener) {
+ Set nodesToPublishTo = new HashSet<>(clusterChangedEvent.state().nodes().size());
DiscoveryNode localNode = nodesProvider.nodes().localNode();
- for (final DiscoveryNode node : clusterState.nodes()) {
+ for (final DiscoveryNode node : clusterChangedEvent.state().nodes()) {
if (node.equals(localNode)) {
continue;
}
nodesToPublishTo.add(node);
}
- publish(clusterState, nodesToPublishTo, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener));
+ publish(clusterChangedEvent, nodesToPublishTo, new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener));
}
- private void publish(final ClusterState clusterState, final Set nodesToPublishTo,
+ private void publish(final ClusterChangedEvent clusterChangedEvent, final Set nodesToPublishTo,
final BlockingClusterStatePublishResponseHandler publishResponseHandler) {
Map serializedStates = Maps.newHashMap();
+ Map serializedDiffs = Maps.newHashMap();
final AtomicBoolean timedOutWaitingForNodes = new AtomicBoolean(false);
final TimeValue publishTimeout = discoverySettings.getPublishTimeout();
+ final boolean sendFullVersion = !discoverySettings.getPublishDiffs() || clusterChangedEvent.previousState() == null;
+ final ClusterState clusterState = clusterChangedEvent.state();
+ Diff diff = null;
for (final DiscoveryNode node : nodesToPublishTo) {
// try and serialize the cluster state once (or per version), so we don't serialize it
// per node when we send it over the wire, compress it while we are at it...
- BytesReference bytes = serializedStates.get(node.version());
- if (bytes == null) {
- try {
- BytesStreamOutput bStream = new BytesStreamOutput();
- StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream);
- stream.setVersion(node.version());
- ClusterState.Builder.writeTo(clusterState, stream);
- stream.close();
- bytes = bStream.bytes();
- serializedStates.put(node.version(), bytes);
- } catch (Throwable e) {
- logger.warn("failed to serialize cluster_state before publishing it to node {}", e, node);
- publishResponseHandler.onFailure(node, e);
- continue;
+ BytesReference bytes;
+ boolean newlyAddedNode = !clusterChangedEvent.previousState().nodes().nodeExists(node.id());
+ if (sendFullVersion || newlyAddedNode) {
+ bytes = serializedStates.get(node.version());
+ if (bytes == null) {
+ try {
+ bytes = serializeFullClusterState(clusterState, node.version());
+ serializedStates.put(node.version(), bytes);
+ } catch (Throwable e) {
+ logger.warn("failed to serialize cluster_state before publishing it to node {}", e, node);
+ publishResponseHandler.onFailure(node, e);
+ continue;
+ }
+ }
+ } else {
+ bytes = serializedDiffs.get(node.version());
+ if (bytes == null) {
+ if (diff == null) {
+ diff = clusterChangedEvent.state().diffs(clusterChangedEvent.previousState());
+ }
+ try {
+ bytes = serializeDiffClusterState(diff, node.version());
+ serializedDiffs.put(node.version(), bytes);
+ } catch (Throwable e) {
+ logger.warn("failed to serialize cluster_state before publishing it to node {}", e, node);
+ publishResponseHandler.onFailure(node, e);
+ continue;
+ }
}
}
try {
@@ -143,8 +167,13 @@ public void handleResponse(TransportResponse.Empty response) {
@Override
public void handleException(TransportException exp) {
- logger.debug("failed to send cluster state to {}", exp, node);
- publishResponseHandler.onFailure(node, exp);
+ if (exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
+ logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage());
+ resendFullClusterState(clusterState, node, timedOutWaitingForNodes, publishResponseHandler);
+ } else {
+ logger.debug("failed to send cluster state to {}", exp, node);
+ publishResponseHandler.onFailure(node, exp);
+ }
}
});
} catch (Throwable t) {
@@ -171,7 +200,70 @@ public void handleException(TransportException exp) {
}
}
+ private void resendFullClusterState(final ClusterState clusterState, final DiscoveryNode node, final AtomicBoolean timedOutWaitingForNodes, final BlockingClusterStatePublishResponseHandler publishResponseHandler) {
+ final TimeValue publishTimeout = discoverySettings.getPublishTimeout();
+ BytesReference bytes;
+ try {
+ bytes = serializeFullClusterState(clusterState, node.version());
+ } catch (Throwable e) {
+ logger.warn("failed to serialize cluster_state before publishing it to node {}", e, node);
+ publishResponseHandler.onFailure(node, e);
+ return;
+ }
+ try {
+ TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false);
+ // no need to put a timeout on the options here, because we want the response to eventually be received
+ // and not log an error if it arrives after the timeout
+ transportService.sendRequest(node, ACTION_NAME,
+ new BytesTransportRequest(bytes, node.version()),
+ options, // no need to compress, we already compressed the bytes
+
+ new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
+
+ @Override
+ public void handleResponse(TransportResponse.Empty response) {
+ if (timedOutWaitingForNodes.get()) {
+ logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node, clusterState.version(), publishTimeout);
+ }
+ publishResponseHandler.onResponse(node);
+ }
+
+ @Override
+ public void handleException(TransportException exp) {
+ logger.debug("failed to send cluster state to {}", exp, node);
+ publishResponseHandler.onFailure(node, exp);
+ }
+ });
+ } catch (Throwable t) {
+ logger.debug("error sending cluster state to {}", t, node);
+ publishResponseHandler.onFailure(node, t);
+ }
+ }
+
+ private BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
+ BytesStreamOutput bStream = new BytesStreamOutput();
+ StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream);
+ stream.setVersion(nodeVersion);
+ stream.writeBoolean(true);
+ ClusterState.Builder.writeTo(clusterState, stream);
+ stream.close();
+ return bStream.bytes();
+ }
+
+ private BytesReference serializeDiffClusterState(Diff diff, Version nodeVersion) throws IOException {
+ BytesStreamOutput bStream = new BytesStreamOutput();
+ StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream);
+ stream.setVersion(nodeVersion);
+ stream.writeBoolean(false);
+ diff.writeTo(stream);
+ stream.close();
+ return bStream.bytes();
+ }
+
private class PublishClusterStateRequestHandler extends BaseTransportRequestHandler {
+ private volatile ClusterState clusterState;
+
+ private final Lock lock = new ReentrantLock();
@Override
public BytesTransportRequest newInstance() {
@@ -188,8 +280,24 @@ public void messageReceived(BytesTransportRequest request, final TransportChanne
in = request.bytes().streamInput();
}
in.setVersion(request.version());
- ClusterState clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
- clusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
+ lock.lock();
+ try {
+ if (in.readBoolean()) {
+ clusterState = ClusterState.Builder.readFrom(in, nodesProvider.nodes().localNode());
+ ;
+ logger.debug("received full cluster state version {} with size {}", clusterState.version(), request.bytes().length());
+ } else {
+ if (clusterState == null) {
+ logger.debug("received diffs for {}..{} but don't have any local cluster state - requesting full state");
+ throw new IncompatibleClusterStateVersionException("have no local cluster state");
+ }
+ clusterState = clusterState.readDiffs(in, nodesProvider.nodes().localNode());
+ }
+ clusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
+ } finally {
+ lock.unlock();
+ }
+
logger.debug("received cluster state version {}", clusterState.version());
try {
listener.onNewClusterState(clusterState, new NewClusterStateListener.NewStateProcessed() {
diff --git a/src/main/java/org/elasticsearch/gateway/Gateway.java b/src/main/java/org/elasticsearch/gateway/Gateway.java
index cd15bccdc4a64..1264f98550755 100644
--- a/src/main/java/org/elasticsearch/gateway/Gateway.java
+++ b/src/main/java/org/elasticsearch/gateway/Gateway.java
@@ -159,7 +159,7 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t
}
ClusterState.Builder builder = ClusterState.builder(clusterName);
builder.metaData(metaDataBuilder);
- listener.onSuccess(builder.build());
+ listener.onSuccess(builder.randomUuid().build());
}
public void reset() throws Exception {
diff --git a/src/test/java/org/elasticsearch/cluster/serialization/DiffableTests.java b/src/test/java/org/elasticsearch/cluster/serialization/DiffableTests.java
new file mode 100644
index 0000000000000..4722115b1cbba
--- /dev/null
+++ b/src/test/java/org/elasticsearch/cluster/serialization/DiffableTests.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.serialization;
+
+import com.google.common.collect.ImmutableMap;
+import org.elasticsearch.cluster.Diff;
+import org.elasticsearch.cluster.DiffableUtils;
+import org.elasticsearch.cluster.DiffableUtils.KeyedReader;
+import org.elasticsearch.cluster.SimpleDiffable;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.io.stream.BytesStreamInput;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.test.ElasticsearchTestCase;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.google.common.collect.Maps.newHashMap;
+import static org.hamcrest.CoreMatchers.equalTo;
+
+public class DiffableTests extends ElasticsearchTestCase {
+
+ @Test
+ public void testImmutableMapDiffs() throws IOException {
+ ImmutableMap.Builder builder = ImmutableMap.builder();
+ builder.put("foo", new TestDiffable("1"));
+ builder.put("bar", new TestDiffable("2"));
+ builder.put("baz", new TestDiffable("3"));
+ ImmutableMap before = builder.build();
+ Map map = newHashMap();
+ map.putAll(before);
+ map.remove("bar");
+ map.put("baz", new TestDiffable("4"));
+ map.put("new", new TestDiffable("5"));
+ ImmutableMap after = ImmutableMap.copyOf(map);
+ Diff diffs = DiffableUtils.diff(before, after);
+ BytesStreamOutput out = new BytesStreamOutput();
+ diffs.writeTo(out);
+ BytesStreamInput in = new BytesStreamInput(out.bytes());
+ ImmutableMap serialized = DiffableUtils.readDiff(before, in, TestDiffable.PROTO);
+ assertThat(serialized.size(), equalTo(3));
+ assertThat(serialized.get("foo").value(), equalTo("1"));
+ assertThat(serialized.get("baz").value(), equalTo("4"));
+ assertThat(serialized.get("new").value(), equalTo("5"));
+ }
+
+ @Test
+ public void testImmutableOpenMapDiffs() throws IOException {
+ ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder();
+ builder.put("foo", new TestDiffable("1"));
+ builder.put("bar", new TestDiffable("2"));
+ builder.put("baz", new TestDiffable("3"));
+ ImmutableOpenMap before = builder.build();
+ builder = ImmutableOpenMap.builder(before);
+ builder.remove("bar");
+ builder.put("baz", new TestDiffable("4"));
+ builder.put("new", new TestDiffable("5"));
+ ImmutableOpenMap after = builder.build();
+ Diff diffs = DiffableUtils.diff(before, after);
+ BytesStreamOutput out = new BytesStreamOutput();
+ diffs.writeTo(out);
+ BytesStreamInput in = new BytesStreamInput(out.bytes());
+ ImmutableOpenMap serialized = DiffableUtils.readDiff(before, in, new KeyedReader() {
+ @Override
+ public TestDiffable readFrom(StreamInput in, String key) throws IOException {
+ return new TestDiffable(in.readString());
+ }
+ });
+ assertThat(serialized.size(), equalTo(3));
+ assertThat(serialized.get("foo").value(), equalTo("1"));
+ assertThat(serialized.get("baz").value(), equalTo("4"));
+ assertThat(serialized.get("new").value(), equalTo("5"));
+
+ }
+ public static class TestDiffable extends SimpleDiffable {
+
+ public static final TestDiffable PROTO = new TestDiffable("");
+
+ private final String value;
+
+ public TestDiffable(String value) {
+ this.value = value;
+ }
+
+ public String value() {
+ return value;
+ }
+
+ @Override
+ public TestDiffable readFrom(StreamInput in) throws IOException {
+ return new TestDiffable(in.readString());
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeString(value);
+ }
+ }
+
+}
diff --git a/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryTests.java b/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryTests.java
index 469da2078e0ef..982a5076571c3 100644
--- a/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryTests.java
+++ b/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryTests.java
@@ -197,6 +197,7 @@ public void testNodeRejectsClusterStateWithWrongMasterNode() throws Exception {
BytesStreamOutput bStream = new BytesStreamOutput();
StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream);
stream.setVersion(node.version());
+ stream.writeBoolean(true); // To indicate that the full version is going to follow
ClusterState.Builder.writeTo(builder.build(), stream);
stream.close();
BytesReference bytes = bStream.bytes();
diff --git a/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java b/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java
index aea568bd69e24..fa69606b06df8 100644
--- a/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java
+++ b/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java
@@ -39,7 +39,9 @@
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
+import org.elasticsearch.cluster.SimpleDiffable;
import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.metadata.MetaData.Custom;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.io.stream.StreamInput;
@@ -713,7 +715,7 @@ private void createTestIndex(String name) {
));
}
- public static abstract class TestCustomMetaData implements MetaData.Custom {
+ public static abstract class TestCustomMetaData extends SimpleDiffable implements MetaData.Custom {
private final String data;
protected TestCustomMetaData(String data) {
@@ -821,6 +823,16 @@ public EnumSet context() {
return MetaData.API_AND_SNAPSHOT;
}
}
+
+ @Override
+ public Custom readFrom(StreamInput in) throws IOException {
+ return FACTORY.readFrom(in);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ FACTORY.writeTo(this, out);
+ }
}
public static class NonSnapshottableMetadata extends TestCustomMetaData {
@@ -844,6 +856,16 @@ protected NonSnapshottableMetadata newTestCustomMetaData(String data) {
return new NonSnapshottableMetadata(data);
}
}
+
+ @Override
+ public Custom readFrom(StreamInput in) throws IOException {
+ return FACTORY.readFrom(in);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ FACTORY.writeTo(this, out);
+ }
}
public static class SnapshottableGatewayMetadata extends TestCustomMetaData {
@@ -872,6 +894,16 @@ public EnumSet context() {
return EnumSet.of(MetaData.XContentContext.API, MetaData.XContentContext.SNAPSHOT, MetaData.XContentContext.GATEWAY);
}
}
+
+ @Override
+ public Custom readFrom(StreamInput in) throws IOException {
+ return FACTORY.readFrom(in);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ FACTORY.writeTo(this, out);
+ }
}
public static class NonSnapshottableGatewayMetadata extends TestCustomMetaData {
@@ -901,6 +933,17 @@ public EnumSet context() {
}
}
+
+ @Override
+ public Custom readFrom(StreamInput in) throws IOException {
+ return FACTORY.readFrom(in);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ FACTORY.writeTo(this, out);
+ }
+
}
public static class SnapshotableGatewayNoApiMetadata extends TestCustomMetaData {
@@ -930,6 +973,17 @@ public EnumSet context() {
}
}
+
+ @Override
+ public Custom readFrom(StreamInput in) throws IOException {
+ return FACTORY.readFrom(in);
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ FACTORY.writeTo(this, out);
+ }
+
}
}
diff --git a/src/test/java/org/elasticsearch/tribe/TribeTests.java b/src/test/java/org/elasticsearch/tribe/TribeTests.java
index c4871ac2fa63f..a6e38da6b1ad3 100644
--- a/src/test/java/org/elasticsearch/tribe/TribeTests.java
+++ b/src/test/java/org/elasticsearch/tribe/TribeTests.java
@@ -38,6 +38,7 @@
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.TestCluster;
+import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;