From ed3f3cacdd9f76b31175ed1ae67d20a043e5aec0 Mon Sep 17 00:00:00 2001 From: zyxxoo <1318247699@qq.com> Date: Mon, 15 Aug 2022 18:30:17 +0800 Subject: [PATCH 1/9] #1869 cluster role automatic management --- .../com/baidu/hugegraph/election/Config.java | 14 + .../baidu/hugegraph/election/MetaData.java | 45 +++ .../hugegraph/election/MetaDataAdapter.java | 11 + .../election/RoleElectionStateMachine.java | 9 + .../RoleElectionStateMachineImpl.java | 277 ++++++++++++++++++ .../election/StateMachineCallback.java | 16 + .../election/StateMachineContext.java | 17 ++ .../core/RoleElectionStateMachineTest.java | 214 ++++++++++++++ 8 files changed, 603 insertions(+) create mode 100644 hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java create mode 100644 hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java create mode 100644 hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java create mode 100644 hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java create mode 100644 hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java create mode 100644 hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java create mode 100644 hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java create mode 100644 hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java new file mode 100644 index 0000000000..aa49ecc8a9 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java @@ -0,0 +1,14 @@ +package com.baidu.hugegraph.election; + +public interface Config { + + String node(); + + int exceedsFailCount(); + + long randomTimeoutMillisecond(); + + long heartBeatIntervalSecond(); + + int exceedsWorkerCount(); +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java new file mode 100644 index 0000000000..684c2fb120 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java @@ -0,0 +1,45 @@ +package com.baidu.hugegraph.election; + +import java.util.Objects; + +public class MetaData { + + String node; + long count; + int epoch; + + public MetaData(String node, int epoch) { + this.node = node; + this.epoch = epoch; + this.count = 1; + } + + public void increaseCount() { + this.count ++; + } + + public boolean isMaster(String node) { + return Objects.equals(this.node, node); + } + + public int epoch() { + return this.epoch; + } + + public long count() { + return this.count; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof MetaData)) return false; + MetaData metaData = (MetaData) o; + return count == metaData.count && epoch == metaData.epoch && Objects.equals(node, metaData.node); + } + + @Override + public int hashCode() { + return Objects.hash(node, count, epoch); + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java new file mode 100644 index 0000000000..c879c1f6df --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java @@ -0,0 +1,11 @@ +package com.baidu.hugegraph.election; + +import java.util.Optional; + +public interface MetaDataAdapter { + boolean postDelyIfPresent(MetaData metaData, long delySecond); + + Optional queryDelay(long delySecond); + + Optional query(); +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java new file mode 100644 index 0000000000..666c836a70 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java @@ -0,0 +1,9 @@ +package com.baidu.hugegraph.election; + +public interface RoleElectionStateMachine { + + void shutdown(); + + void apply(StateMachineCallback stateMachineCallback); + +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java new file mode 100644 index 0000000000..b8d96026b1 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java @@ -0,0 +1,277 @@ +package com.baidu.hugegraph.election; + +import java.util.Optional; +import java.util.concurrent.locks.LockSupport; + +import com.baidu.hugegraph.util.E; + +public class RoleElectionStateMachineImpl implements RoleElectionStateMachine{ + + private volatile boolean shutdown = false; + private Config config; + + private volatile RoleState state; + + private final MetaDataAdapter metaDataAdapter; + + public RoleElectionStateMachineImpl(Config config, MetaDataAdapter adapter) { + this.config = config; + this.metaDataAdapter = adapter; + this.state = new UnKnownState(null); + } + + @Override + public void shutdown() { + this.shutdown = true; + } + + @Override + public void apply(StateMachineCallback stateMachineCallback) { + int failCount = 0; + while (!this.shutdown) { + E.checkArgumentNotNull(this.state, "State don't be null"); + StateMachineContextImpl context = new StateMachineContextImpl(this); + try { + this.state = state.transform(context); + Callback runnable = this.state.callback(stateMachineCallback); + runnable.call(context); + failCount = 0; + } catch (Throwable e) { + stateMachineCallback.error(context, e); + failCount ++; + if (failCount >= this.config.exceedsFailCount()) { + this.state = new SafeState(context.epoch()); + Callback runnable = this.state.callback(stateMachineCallback); + runnable.call(context); + } + } + } + } + + private interface RoleState { + + RoleState transform(StateMachineContext context); + + Callback callback(StateMachineCallback callback); + + static void heartBeatPark(StateMachineContext context) { + long heartBeatIntervalSecond = context.config().heartBeatIntervalSecond(); + LockSupport.parkNanos(heartBeatIntervalSecond * 1_000_000_000); + } + + static void randomPark(StateMachineContext context) { + long randomTimeout = context.config().randomTimeoutMillisecond(); + LockSupport.parkNanos(randomTimeout * 1_000_000); + } + } + + @FunctionalInterface + private interface Callback { + + void call(StateMachineContext context); + } + + private static class UnKnownState implements RoleState { + + Integer epoch; + + public UnKnownState(Integer epoch) { + this.epoch = epoch; + } + + @Override + public RoleState transform(StateMachineContext context) { + MetaDataAdapter adapter = context.adapter(); + Optional metaDataOpt = adapter.query(); + if (!metaDataOpt.isPresent()) { + context.reset(); + return new CandidateState(epoch == null ? 1 : epoch + 1); + } + + MetaData metaData = metaDataOpt.get(); + context.epoch(metaData.epoch()); + if (metaData.isMaster(context.node())) { + return new MasterState(metaData); + } else { + return new WorkerState(metaData); + } + } + + @Override + public Callback callback(StateMachineCallback callback) { + return callback::unknown; + } + } + + private static class SafeState implements RoleState { + + Integer epoch; + + public SafeState(Integer epoch) { + this.epoch = epoch; + } + + @Override + public RoleState transform(StateMachineContext context) { + RoleState.heartBeatPark(context); + return new UnKnownState(this.epoch).transform(context); + } + + @Override + public Callback callback(StateMachineCallback callback) { + return callback::safe; + } + } + + private static class MasterState implements RoleState { + + MetaData metaData; + + public MasterState(MetaData metaData) { + this.metaData = metaData; + } + + @Override + public RoleState transform(StateMachineContext context) { + this.metaData.increaseCount(); + RoleState.heartBeatPark(context); + if (context.adapter().postDelyIfPresent(this.metaData, -1)) { + return this; + } + context.reset(); + return new UnKnownState(this.metaData.epoch()); + } + + @Override + public Callback callback(StateMachineCallback callback) { + return callback::master; + } + } + + private static class WorkerState implements RoleState { + + private MetaData metaData; + private int count = 0; + + public WorkerState(MetaData metaData) { + this.metaData = metaData; + } + + @Override + public RoleState transform(StateMachineContext context) { + RoleState.heartBeatPark(context); + RoleState nextState = new UnKnownState(this.metaData.epoch()).transform(context); + if (nextState instanceof WorkerState) { + this.merge((WorkerState) nextState); + if (this.count > context.config().exceedsWorkerCount()) { + return new CandidateState(this.metaData.epoch() + 1); + } else { + return this; + } + } else { + return nextState; + } + } + + @Override + public Callback callback(StateMachineCallback callback) { + return callback::worker; + } + + public void merge(WorkerState state) { + if (state.metaData.epoch() > this.metaData.epoch()) { + this.count = 0; + this.metaData = state.metaData; + } else if (state.metaData.epoch() < this.metaData.epoch()){ + throw new IllegalStateException("Epoch must increase"); + } else if (state.metaData.epoch() == this.metaData.epoch() && + state.metaData.count() < this.metaData.count()) { + throw new IllegalStateException("Meta count must increase"); + } else if (state.metaData.epoch() == this.metaData.epoch() && + state.metaData.count() > this.metaData.count()) { + this.metaData = state.metaData; + } else { + this.count ++; + } + } + } + + private static class CandidateState implements RoleState { + + Integer epoch; + + public CandidateState(Integer epoch) { + this.epoch = epoch; + } + + @Override + public RoleState transform(StateMachineContext context) { + RoleState.randomPark(context); + int epoch = this.epoch == null ? 1 : this.epoch; + MetaData metaData = new MetaData(context.config().node(), epoch); + //failover to master success + context.epoch(metaData.epoch()); + if (context.adapter().postDelyIfPresent(metaData, -1)) { + return new MasterState(metaData); + } else { + return new WorkerState(metaData); + } + } + + @Override + public Callback callback(StateMachineCallback callback) { + return callback::candidate; + } + } + + private static class StateMachineContextImpl implements StateMachineContext { + + private Integer epoch; + private final String node; + private final RoleElectionStateMachineImpl machine; + + public StateMachineContextImpl(RoleElectionStateMachineImpl machine) { + this.node = machine.config.node(); + this.machine = machine; + } + + @Override + public Integer epoch() { + return this.epoch; + } + + @Override + public String node() { + return this.node; + } + + @Override + public void epoch(Integer epoch) { + this.epoch = epoch; + } + + @Override + public MetaDataAdapter adapter() { + return this.machine.adapter(); + } + + @Override + public Config config() { + return this.machine.config; + } + + @Override + public RoleElectionStateMachine stateMachine() { + return this.machine; + } + + @Override + public void reset() { + this.epoch = null; + } + } + + protected MetaDataAdapter adapter() { + return this.metaDataAdapter; + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java new file mode 100644 index 0000000000..b765c455c9 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java @@ -0,0 +1,16 @@ +package com.baidu.hugegraph.election; + +public interface StateMachineCallback { + + void master(StateMachineContext context); + + void worker(StateMachineContext context); + + void candidate(StateMachineContext context); + + void unknown(StateMachineContext context); + + void safe(StateMachineContext context); + + void error(StateMachineContext context, Throwable e); +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java new file mode 100644 index 0000000000..9b4ca07ab9 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java @@ -0,0 +1,17 @@ +package com.baidu.hugegraph.election; + +public interface StateMachineContext { + Integer epoch(); + + String node(); + + RoleElectionStateMachine stateMachine(); + + void epoch(Integer epoch); + + Config config(); + + MetaDataAdapter adapter(); + + void reset(); +} diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java new file mode 100644 index 0000000000..e35952a8fe --- /dev/null +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java @@ -0,0 +1,214 @@ +package com.baidu.hugegraph.core; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.LockSupport; + +import com.baidu.hugegraph.election.Config; +import com.baidu.hugegraph.election.MetaData; +import com.baidu.hugegraph.election.MetaDataAdapter; +import com.baidu.hugegraph.election.RoleElectionStateMachine; +import com.baidu.hugegraph.election.RoleElectionStateMachineImpl; +import com.baidu.hugegraph.election.StateMachineCallback; +import com.baidu.hugegraph.election.StateMachineContext; + +import org.junit.Assert; +import org.junit.Test; + +public class RoleElectionStateMachineTest { + + public static class LogEntry { + + Integer epoch; + String node; + + Role role; + + enum Role { + master, + worker, + candidate, + safe, + unknown + } + + public LogEntry(Integer epoch, String node, Role role) { + this.epoch = epoch; + this.node = node; + this.role = role; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof LogEntry)) return false; + LogEntry logEntry = (LogEntry) o; + return Objects.equals(epoch, logEntry.epoch) && Objects.equals(node, logEntry.node) && role == logEntry.role; + } + + @Override + public int hashCode() { + return Objects.hash(epoch, node, role); + } + + @Override + public String toString() { + return "LogEntry{" + + "epoch=" + epoch + + ", node='" + node + '\'' + + ", role=" + role + + '}'; + } + } + + private static class TestConfig implements Config { + + String node; + + public TestConfig(String node) { + this.node = node; + } + + @Override + public String node() { + return this.node; + } + + @Override + public int exceedsFailCount() { + return 10; + } + + @Override + public long randomTimeoutMillisecond() { + return 400; + } + + @Override + public long heartBeatIntervalSecond() { + return 1; + } + + @Override + public int exceedsWorkerCount() { + return 5; + } + } + + @Test + public void testStateMachine() throws InterruptedException { + final CountDownLatch stop = new CountDownLatch(3); + final List logRecords = Collections.synchronizedList(new ArrayList<>(20)); + final StateMachineCallback callback = new StateMachineCallback() { + + @Override + public void master(StateMachineContext context) { + Integer epochId = context.epoch(); + String node = context.node(); + logRecords.add(new LogEntry(epochId, node, LogEntry.Role.master)); + } + + @Override + public void worker(StateMachineContext context) { + Integer epochId = context.epoch(); + String node = context.node(); + logRecords.add(new LogEntry(epochId, node, LogEntry.Role.worker)); + } + + @Override + public void candidate(StateMachineContext context) { + Integer epochId = context.epoch(); + String node = context.node(); + logRecords.add(new LogEntry(epochId, node, LogEntry.Role.candidate)); + } + + @Override + public void unknown(StateMachineContext context) { + Integer epochId = context.epoch(); + String node = context.node(); + logRecords.add(new LogEntry(epochId, node, LogEntry.Role.candidate)); + } + + @Override + public void safe(StateMachineContext context) { + Integer epochId = context.epoch(); + String node = context.node(); + logRecords.add(new LogEntry(epochId, node, LogEntry.Role.safe)); + } + + @Override + public void error(StateMachineContext context, Throwable e) { + + } + }; + final MetaDataAdapter adapter = new MetaDataAdapter() { + int epoch = 0; + Map data = new ConcurrentHashMap<>(); + @Override + public boolean postDelyIfPresent(MetaData metaData, long delySecond) { + LockSupport.parkNanos(delySecond * 1_000_000_000); + MetaData oldData = data.computeIfAbsent(metaData.epoch(), (key) -> { + this.epoch = key; + return metaData; + }); + return oldData == metaData; + } + + @Override + public Optional queryDelay(long delySecond) { + LockSupport.parkNanos(delySecond * 1_000_000_000); + return Optional.ofNullable(this.data.get(this.epoch)); + } + + @Override + public Optional query() { + return Optional.ofNullable(this.data.get(this.epoch)); + } + }; + + Thread node1 = new Thread(() -> { + Config config = new TestConfig("1"); + RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter); + stateMachine.apply(callback); + stop.countDown(); + }); + + Thread node2 = new Thread(() -> { + Config config = new TestConfig("2"); + RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter); + stateMachine.apply(callback); + stop.countDown(); + }); + + Thread node3 = new Thread(() -> { + Config config = new TestConfig("3"); + RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter); + stateMachine.apply(callback); + stop.countDown(); + }); + + node1.start(); + node2.start(); + node3.start(); + + stop.await(); + + Assert.assertTrue(logRecords.size() > 100); + Map masters = new HashMap<>(); + for (LogEntry entry: logRecords) { + if (entry.role == LogEntry.Role.master) { + String lastNode = masters.putIfAbsent(entry.epoch, entry.node); + Assert.assertEquals(lastNode, entry.node); + } + } + + Assert.assertTrue(masters.size() > 0); + } +} From 99690d800e382803e309f39a33bc93cb050e4e5c Mon Sep 17 00:00:00 2001 From: zyxxoo <1318247699@qq.com> Date: Tue, 16 Aug 2022 09:22:04 +0800 Subject: [PATCH 2/9] improve code --- .../src/main/java/com/baidu/hugegraph/election/MetaData.java | 2 +- .../java/com/baidu/hugegraph/election/MetaDataAdapter.java | 1 + .../hugegraph/election/RoleElectionStateMachineImpl.java | 5 +++-- .../com/baidu/hugegraph/election/StateMachineContext.java | 1 + 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java index 684c2fb120..845971f0ce 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java @@ -15,7 +15,7 @@ public MetaData(String node, int epoch) { } public void increaseCount() { - this.count ++; + this.count++; } public boolean isMaster(String node) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java index c879c1f6df..196183bb76 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java @@ -3,6 +3,7 @@ import java.util.Optional; public interface MetaDataAdapter { + boolean postDelyIfPresent(MetaData metaData, long delySecond); Optional queryDelay(long delySecond); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java index b8d96026b1..8d2b4f9c18 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java @@ -5,7 +5,7 @@ import com.baidu.hugegraph.util.E; -public class RoleElectionStateMachineImpl implements RoleElectionStateMachine{ +public class RoleElectionStateMachineImpl implements RoleElectionStateMachine { private volatile boolean shutdown = false; private Config config; @@ -189,9 +189,10 @@ public void merge(WorkerState state) { throw new IllegalStateException("Meta count must increase"); } else if (state.metaData.epoch() == this.metaData.epoch() && state.metaData.count() > this.metaData.count()) { + this.count = 0; this.metaData = state.metaData; } else { - this.count ++; + this.count++; } } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java index 9b4ca07ab9..74b37f337d 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java @@ -1,6 +1,7 @@ package com.baidu.hugegraph.election; public interface StateMachineContext { + Integer epoch(); String node(); From a0a08787ed344e0c02fc8a69805f743648d2c3af Mon Sep 17 00:00:00 2001 From: zyxxoo <1318247699@qq.com> Date: Tue, 16 Aug 2022 10:12:08 +0800 Subject: [PATCH 3/9] FIX bug --- .../RoleElectionStateMachineImpl.java | 7 +++-- .../core/RoleElectionStateMachineTest.java | 31 ++++++++++++++++--- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java index 8d2b4f9c18..8952c321f1 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java @@ -28,9 +28,9 @@ public void shutdown() { @Override public void apply(StateMachineCallback stateMachineCallback) { int failCount = 0; + StateMachineContextImpl context = new StateMachineContextImpl(this); while (!this.shutdown) { E.checkArgumentNotNull(this.state, "State don't be null"); - StateMachineContextImpl context = new StateMachineContextImpl(this); try { this.state = state.transform(context); Callback runnable = this.state.callback(stateMachineCallback); @@ -85,7 +85,9 @@ public RoleState transform(StateMachineContext context) { Optional metaDataOpt = adapter.query(); if (!metaDataOpt.isPresent()) { context.reset(); - return new CandidateState(epoch == null ? 1 : epoch + 1); + this.epoch = this.epoch == null ? 1 : this.epoch + 1; + context.epoch(this.epoch); + return new CandidateState(this.epoch); } MetaData metaData = metaDataOpt.get(); @@ -139,6 +141,7 @@ public RoleState transform(StateMachineContext context) { return this; } context.reset(); + context.epoch(this.metaData.epoch()); return new UnKnownState(this.metaData.epoch()); } diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java index e35952a8fe..0956387b21 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java @@ -1,5 +1,6 @@ package com.baidu.hugegraph.core; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -9,6 +10,7 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.LockSupport; import com.baidu.hugegraph.election.Config; @@ -105,7 +107,8 @@ public int exceedsWorkerCount() { @Test public void testStateMachine() throws InterruptedException { final CountDownLatch stop = new CountDownLatch(3); - final List logRecords = Collections.synchronizedList(new ArrayList<>(20)); + final int MAX_COUNT = 100; + final List logRecords = Collections.synchronizedList(new ArrayList<>(MAX_COUNT)); final StateMachineCallback callback = new StateMachineCallback() { @Override @@ -113,6 +116,9 @@ public void master(StateMachineContext context) { Integer epochId = context.epoch(); String node = context.node(); logRecords.add(new LogEntry(epochId, node, LogEntry.Role.master)); + if (logRecords.size() > MAX_COUNT) { + context.stateMachine().shutdown(); + } } @Override @@ -120,6 +126,9 @@ public void worker(StateMachineContext context) { Integer epochId = context.epoch(); String node = context.node(); logRecords.add(new LogEntry(epochId, node, LogEntry.Role.worker)); + if (logRecords.size() > MAX_COUNT) { + context.stateMachine().shutdown(); + } } @Override @@ -127,13 +136,19 @@ public void candidate(StateMachineContext context) { Integer epochId = context.epoch(); String node = context.node(); logRecords.add(new LogEntry(epochId, node, LogEntry.Role.candidate)); + if (logRecords.size() > MAX_COUNT) { + context.stateMachine().shutdown(); + } } @Override public void unknown(StateMachineContext context) { Integer epochId = context.epoch(); String node = context.node(); - logRecords.add(new LogEntry(epochId, node, LogEntry.Role.candidate)); + logRecords.add(new LogEntry(epochId, node, LogEntry.Role.unknown)); + if (logRecords.size() > MAX_COUNT) { + context.stateMachine().shutdown(); + } } @Override @@ -141,6 +156,9 @@ public void safe(StateMachineContext context) { Integer epochId = context.epoch(); String node = context.node(); logRecords.add(new LogEntry(epochId, node, LogEntry.Role.safe)); + if (logRecords.size() > MAX_COUNT) { + context.stateMachine().shutdown(); + } } @Override @@ -150,12 +168,17 @@ public void error(StateMachineContext context, Throwable e) { }; final MetaDataAdapter adapter = new MetaDataAdapter() { int epoch = 0; + int count = 0; Map data = new ConcurrentHashMap<>(); @Override public boolean postDelyIfPresent(MetaData metaData, long delySecond) { + this.count ++; LockSupport.parkNanos(delySecond * 1_000_000_000); + if (count > 10) { + throw new RuntimeException("timeout"); + } MetaData oldData = data.computeIfAbsent(metaData.epoch(), (key) -> { - this.epoch = key; + this.epoch = Math.max(key, epoch); return metaData; }); return oldData == metaData; @@ -200,7 +223,7 @@ public Optional query() { stop.await(); - Assert.assertTrue(logRecords.size() > 100); + Assert.assertTrue(logRecords.size() > MAX_COUNT); Map masters = new HashMap<>(); for (LogEntry entry: logRecords) { if (entry.role == LogEntry.Role.master) { From faadb235471efef98ffd37d0a30a5308671f97e1 Mon Sep 17 00:00:00 2001 From: zyxxoo <1318247699@qq.com> Date: Tue, 16 Aug 2022 16:02:31 +0800 Subject: [PATCH 4/9] improve code --- .../com/baidu/hugegraph/election/Config.java | 2 + .../baidu/hugegraph/election/MetaData.java | 23 +++- .../election/RoleElectionStateMachine.java | 1 - .../RoleElectionStateMachineImpl.java | 16 ++- .../baidu/hugegraph/core/CoreTestSuite.java | 3 +- .../core/RoleElectionStateMachineTest.java | 110 ++++++++++++++---- 6 files changed, 127 insertions(+), 28 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java index aa49ecc8a9..e85658f3a6 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java @@ -11,4 +11,6 @@ public interface Config { long heartBeatIntervalSecond(); int exceedsWorkerCount(); + + long baseTimeoutMillisecond(); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java index 845971f0ce..e42b571543 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java @@ -9,9 +9,13 @@ public class MetaData { int epoch; public MetaData(String node, int epoch) { + this(node, epoch, 1); + } + + public MetaData(String node, int epoch, long count) { this.node = node; this.epoch = epoch; - this.count = 1; + this.count = count; } public void increaseCount() { @@ -30,6 +34,14 @@ public long count() { return this.count; } + public void count(long count) { + this.count = count; + } + + public String node() { + return this.node; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -42,4 +54,13 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(node, count, epoch); } + + @Override + public String toString() { + return "MetaData{" + + "node='" + node + '\'' + + ", count=" + count + + ", epoch=" + epoch + + '}'; + } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java index 666c836a70..51933a93d7 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java @@ -5,5 +5,4 @@ public interface RoleElectionStateMachine { void shutdown(); void apply(StateMachineCallback stateMachineCallback); - } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java index 8952c321f1..d79c1c0d0b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java @@ -1,5 +1,6 @@ package com.baidu.hugegraph.election; +import java.security.SecureRandom; import java.util.Optional; import java.util.concurrent.locks.LockSupport; @@ -50,6 +51,8 @@ public void apply(StateMachineCallback stateMachineCallback) { private interface RoleState { + SecureRandom secureRandom = new SecureRandom(); + RoleState transform(StateMachineContext context); Callback callback(StateMachineCallback callback); @@ -61,7 +64,9 @@ static void heartBeatPark(StateMachineContext context) { static void randomPark(StateMachineContext context) { long randomTimeout = context.config().randomTimeoutMillisecond(); - LockSupport.parkNanos(randomTimeout * 1_000_000); + long baseTime = context.config().baseTimeoutMillisecond(); + long timeout = (long) (baseTime + (randomTimeout / 10.0 * secureRandom.nextInt(11))); + LockSupport.parkNanos(timeout * 1_000_000); } } @@ -91,6 +96,13 @@ public RoleState transform(StateMachineContext context) { } MetaData metaData = metaDataOpt.get(); + if (this.epoch != null && metaData.epoch() < this.epoch) { + context.reset(); + this.epoch = this.epoch == null ? 1 : this.epoch + 1; + context.epoch(this.epoch); + return new CandidateState(this.epoch); + } + context.epoch(metaData.epoch()); if (metaData.isMaster(context.node())) { return new MasterState(metaData); @@ -142,7 +154,7 @@ public RoleState transform(StateMachineContext context) { } context.reset(); context.epoch(this.metaData.epoch()); - return new UnKnownState(this.metaData.epoch()); + return new UnKnownState(this.metaData.epoch()).transform(context); } @Override diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java index 509cdf814a..d6dd4c350e 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/CoreTestSuite.java @@ -49,7 +49,8 @@ TaskCoreTest.class, AuthTest.class, MultiGraphsTest.class, - RamTableTest.class + RamTableTest.class, + RoleElectionStateMachineTest.class }) public class CoreTestSuite { diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java index 0956387b21..177aeb9d0e 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java @@ -1,18 +1,21 @@ package com.baidu.hugegraph.core; -import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.LockSupport; +import org.junit.Assert; +import org.junit.Test; + import com.baidu.hugegraph.election.Config; import com.baidu.hugegraph.election.MetaData; import com.baidu.hugegraph.election.MetaDataAdapter; @@ -21,9 +24,6 @@ import com.baidu.hugegraph.election.StateMachineCallback; import com.baidu.hugegraph.election.StateMachineContext; -import org.junit.Assert; -import org.junit.Test; - public class RoleElectionStateMachineTest { public static class LogEntry { @@ -85,7 +85,7 @@ public String node() { @Override public int exceedsFailCount() { - return 10; + return 2; } @Override @@ -102,13 +102,19 @@ public long heartBeatIntervalSecond() { public int exceedsWorkerCount() { return 5; } + + @Override + public long baseTimeoutMillisecond() { + return 100; + } } @Test public void testStateMachine() throws InterruptedException { - final CountDownLatch stop = new CountDownLatch(3); - final int MAX_COUNT = 100; + final CountDownLatch stop = new CountDownLatch(4); + final int MAX_COUNT = 200; final List logRecords = Collections.synchronizedList(new ArrayList<>(MAX_COUNT)); + final List masterNodes = Collections.synchronizedList(new ArrayList<>(MAX_COUNT)); final StateMachineCallback callback = new StateMachineCallback() { @Override @@ -119,6 +125,8 @@ public void master(StateMachineContext context) { if (logRecords.size() > MAX_COUNT) { context.stateMachine().shutdown(); } + System.out.println("----master " + node); + masterNodes.add(node); } @Override @@ -163,42 +171,74 @@ public void safe(StateMachineContext context) { @Override public void error(StateMachineContext context, Throwable e) { - + System.out.println("----" + context.node() + " " + e.getMessage()); } }; + + final List metaDataLogs = Collections.synchronizedList(new ArrayList<>(100)); final MetaDataAdapter adapter = new MetaDataAdapter() { - int epoch = 0; - int count = 0; + volatile int epoch = 0; Map data = new ConcurrentHashMap<>(); + + MetaData copy(MetaData metaData) { + if (metaData == null) { + return null; + } + return new MetaData(metaData.node(), metaData.epoch(), metaData.count()); + } @Override public boolean postDelyIfPresent(MetaData metaData, long delySecond) { - this.count ++; - LockSupport.parkNanos(delySecond * 1_000_000_000); - if (count > 10) { - throw new RuntimeException("timeout"); + if (delySecond > 0) { + LockSupport.parkNanos(delySecond * 1_000_000_000); + } + if (metaData.epoch() < this.epoch) { + return false; } - MetaData oldData = data.computeIfAbsent(metaData.epoch(), (key) -> { - this.epoch = Math.max(key, epoch); - return metaData; + + MetaData copy = this.copy(metaData); + MetaData newData = data.compute(copy.epoch(), (key, value) -> { + if (copy.epoch() > this.epoch) { + this.epoch = copy.epoch(); + Assert.assertNull(value); + metaDataLogs.add(copy); + System.out.println("----1" + copy); + return copy; + } + + Assert.assertEquals(value.epoch(), copy.epoch()); + if (Objects.equals(value.node(), copy.node()) && + value.count() <= copy.count()) { + System.out.println("----2" + copy); + metaDataLogs.add(copy); + if (value.count() == copy.count()) { + Exception e = new Exception("eq"); + e.printStackTrace(); + } + return copy; + } + return value; + }); - return oldData == metaData; + return Objects.equals(newData, copy); } @Override public Optional queryDelay(long delySecond) { LockSupport.parkNanos(delySecond * 1_000_000_000); - return Optional.ofNullable(this.data.get(this.epoch)); + return Optional.ofNullable(this.copy(this.data.get(this.epoch))); } @Override public Optional query() { - return Optional.ofNullable(this.data.get(this.epoch)); + return Optional.ofNullable(this.copy(this.data.get(this.epoch))); } }; + RoleElectionStateMachine[] machines = new RoleElectionStateMachine[4]; Thread node1 = new Thread(() -> { Config config = new TestConfig("1"); RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter); + machines[1] = stateMachine; stateMachine.apply(callback); stop.countDown(); }); @@ -206,6 +246,7 @@ public Optional query() { Thread node2 = new Thread(() -> { Config config = new TestConfig("2"); RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter); + machines[2] = stateMachine; stateMachine.apply(callback); stop.countDown(); }); @@ -213,6 +254,7 @@ public Optional query() { Thread node3 = new Thread(() -> { Config config = new TestConfig("3"); RoleElectionStateMachine stateMachine = new RoleElectionStateMachineImpl(config, adapter); + machines[3] = stateMachine; stateMachine.apply(callback); stop.countDown(); }); @@ -221,14 +263,36 @@ public Optional query() { node2.start(); node3.start(); + Thread randomShutdown = new Thread(() -> { + Set dropNodes = new HashSet<>(); + while (dropNodes.size() < 3) { + LockSupport.parkNanos(5_000_000_000L); + int size = masterNodes.size(); + if (size < 1) { + continue; + } + String node = masterNodes.get(size - 1); + if (dropNodes.contains(node)) { + continue; + } + machines[Integer.parseInt(node)].shutdown(); + dropNodes.add(node); + System.out.println("----shutdown machine " + node); + } + stop.countDown(); + }); + + randomShutdown.start(); stop.await(); - Assert.assertTrue(logRecords.size() > MAX_COUNT); + Assert.assertTrue(logRecords.size() > 0); Map masters = new HashMap<>(); for (LogEntry entry: logRecords) { if (entry.role == LogEntry.Role.master) { String lastNode = masters.putIfAbsent(entry.epoch, entry.node); - Assert.assertEquals(lastNode, entry.node); + if (lastNode != null) { + Assert.assertEquals(lastNode, entry.node); + } } } From f342b47d62579e83a04b56cf86a8427909b3cc4d Mon Sep 17 00:00:00 2001 From: zyxxoo <1318247699@qq.com> Date: Tue, 16 Aug 2022 16:41:01 +0800 Subject: [PATCH 5/9] improve code --- .../com/baidu/hugegraph/election/Config.java | 19 +++ .../baidu/hugegraph/election/MetaData.java | 66 ---------- .../hugegraph/election/MetaDataAdapter.java | 12 -- .../election/RoleElectionStateMachine.java | 19 +++ .../RoleElectionStateMachineImpl.java | 121 ++++++++++-------- .../election/RoleStataDataAdapter.java | 31 +++++ .../hugegraph/election/RoleStateData.java | 91 +++++++++++++ .../election/StateMachineCallback.java | 19 +++ .../election/StateMachineContext.java | 21 ++- .../core/RoleElectionStateMachineTest.java | 57 ++++++--- 10 files changed, 308 insertions(+), 148 deletions(-) delete mode 100644 hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java delete mode 100644 hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java create mode 100644 hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStataDataAdapter.java create mode 100644 hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStateData.java diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java index e85658f3a6..ad39c0b5ef 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/Config.java @@ -1,3 +1,22 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + package com.baidu.hugegraph.election; public interface Config { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java deleted file mode 100644 index e42b571543..0000000000 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaData.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.baidu.hugegraph.election; - -import java.util.Objects; - -public class MetaData { - - String node; - long count; - int epoch; - - public MetaData(String node, int epoch) { - this(node, epoch, 1); - } - - public MetaData(String node, int epoch, long count) { - this.node = node; - this.epoch = epoch; - this.count = count; - } - - public void increaseCount() { - this.count++; - } - - public boolean isMaster(String node) { - return Objects.equals(this.node, node); - } - - public int epoch() { - return this.epoch; - } - - public long count() { - return this.count; - } - - public void count(long count) { - this.count = count; - } - - public String node() { - return this.node; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof MetaData)) return false; - MetaData metaData = (MetaData) o; - return count == metaData.count && epoch == metaData.epoch && Objects.equals(node, metaData.node); - } - - @Override - public int hashCode() { - return Objects.hash(node, count, epoch); - } - - @Override - public String toString() { - return "MetaData{" + - "node='" + node + '\'' + - ", count=" + count + - ", epoch=" + epoch + - '}'; - } -} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java deleted file mode 100644 index 196183bb76..0000000000 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/MetaDataAdapter.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.baidu.hugegraph.election; - -import java.util.Optional; - -public interface MetaDataAdapter { - - boolean postDelyIfPresent(MetaData metaData, long delySecond); - - Optional queryDelay(long delySecond); - - Optional query(); -} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java index 51933a93d7..4bc258623b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachine.java @@ -1,3 +1,22 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + package com.baidu.hugegraph.election; public interface RoleElectionStateMachine { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java index d79c1c0d0b..ffb535d233 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java @@ -1,3 +1,22 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + package com.baidu.hugegraph.election; import java.security.SecureRandom; @@ -9,15 +28,13 @@ public class RoleElectionStateMachineImpl implements RoleElectionStateMachine { private volatile boolean shutdown = false; - private Config config; - + private final Config config; private volatile RoleState state; + private final RoleStataDataAdapter roleStataDataAdapter; - private final MetaDataAdapter metaDataAdapter; - - public RoleElectionStateMachineImpl(Config config, MetaDataAdapter adapter) { + public RoleElectionStateMachineImpl(Config config, RoleStataDataAdapter adapter) { this.config = config; - this.metaDataAdapter = adapter; + this.roleStataDataAdapter = adapter; this.state = new UnKnownState(null); } @@ -78,7 +95,7 @@ private interface Callback { private static class UnKnownState implements RoleState { - Integer epoch; + final Integer epoch; public UnKnownState(Integer epoch) { this.epoch = epoch; @@ -86,28 +103,28 @@ public UnKnownState(Integer epoch) { @Override public RoleState transform(StateMachineContext context) { - MetaDataAdapter adapter = context.adapter(); - Optional metaDataOpt = adapter.query(); - if (!metaDataOpt.isPresent()) { + RoleStataDataAdapter adapter = context.adapter(); + Optional stateDataOpt = adapter.query(); + if (!stateDataOpt.isPresent()) { context.reset(); - this.epoch = this.epoch == null ? 1 : this.epoch + 1; - context.epoch(this.epoch); - return new CandidateState(this.epoch); + Integer nextEpoch = this.epoch == null ? 1 : this.epoch + 1; + context.epoch(nextEpoch); + return new CandidateState(nextEpoch); } - MetaData metaData = metaDataOpt.get(); - if (this.epoch != null && metaData.epoch() < this.epoch) { + RoleStateData stateData = stateDataOpt.get(); + if (this.epoch != null && stateData.epoch() < this.epoch) { context.reset(); - this.epoch = this.epoch == null ? 1 : this.epoch + 1; - context.epoch(this.epoch); - return new CandidateState(this.epoch); + Integer nextEpoch = this.epoch + 1; + context.epoch(nextEpoch); + return new CandidateState(nextEpoch); } - context.epoch(metaData.epoch()); - if (metaData.isMaster(context.node())) { - return new MasterState(metaData); + context.epoch(stateData.epoch()); + if (stateData.isMaster(context.node())) { + return new MasterState(stateData); } else { - return new WorkerState(metaData); + return new WorkerState(stateData); } } @@ -119,7 +136,7 @@ public Callback callback(StateMachineCallback callback) { private static class SafeState implements RoleState { - Integer epoch; + private final Integer epoch; public SafeState(Integer epoch) { this.epoch = epoch; @@ -139,22 +156,22 @@ public Callback callback(StateMachineCallback callback) { private static class MasterState implements RoleState { - MetaData metaData; + private final RoleStateData stateData; - public MasterState(MetaData metaData) { - this.metaData = metaData; + public MasterState(RoleStateData stateData) { + this.stateData = stateData; } @Override public RoleState transform(StateMachineContext context) { - this.metaData.increaseCount(); + this.stateData.increaseCount(); RoleState.heartBeatPark(context); - if (context.adapter().postDelyIfPresent(this.metaData, -1)) { + if (context.adapter().delayIfNodePresent(this.stateData, -1)) { return this; } context.reset(); - context.epoch(this.metaData.epoch()); - return new UnKnownState(this.metaData.epoch()).transform(context); + context.epoch(this.stateData.epoch()); + return new UnKnownState(this.stateData.epoch()).transform(context); } @Override @@ -165,21 +182,21 @@ public Callback callback(StateMachineCallback callback) { private static class WorkerState implements RoleState { - private MetaData metaData; + private RoleStateData stateData; private int count = 0; - public WorkerState(MetaData metaData) { - this.metaData = metaData; + public WorkerState(RoleStateData stateData) { + this.stateData = stateData; } @Override public RoleState transform(StateMachineContext context) { RoleState.heartBeatPark(context); - RoleState nextState = new UnKnownState(this.metaData.epoch()).transform(context); + RoleState nextState = new UnKnownState(this.stateData.epoch()).transform(context); if (nextState instanceof WorkerState) { this.merge((WorkerState) nextState); if (this.count > context.config().exceedsWorkerCount()) { - return new CandidateState(this.metaData.epoch() + 1); + return new CandidateState(this.stateData.epoch() + 1); } else { return this; } @@ -194,18 +211,18 @@ public Callback callback(StateMachineCallback callback) { } public void merge(WorkerState state) { - if (state.metaData.epoch() > this.metaData.epoch()) { + if (state.stateData.epoch() > this.stateData.epoch()) { this.count = 0; - this.metaData = state.metaData; - } else if (state.metaData.epoch() < this.metaData.epoch()){ + this.stateData = state.stateData; + } else if (state.stateData.epoch() < this.stateData.epoch()){ throw new IllegalStateException("Epoch must increase"); - } else if (state.metaData.epoch() == this.metaData.epoch() && - state.metaData.count() < this.metaData.count()) { + } else if (state.stateData.epoch() == this.stateData.epoch() && + state.stateData.count() < this.stateData.count()) { throw new IllegalStateException("Meta count must increase"); - } else if (state.metaData.epoch() == this.metaData.epoch() && - state.metaData.count() > this.metaData.count()) { + } else if (state.stateData.epoch() == this.stateData.epoch() && + state.stateData.count() > this.stateData.count()) { this.count = 0; - this.metaData = state.metaData; + this.stateData = state.stateData; } else { this.count++; } @@ -214,7 +231,7 @@ public void merge(WorkerState state) { private static class CandidateState implements RoleState { - Integer epoch; + private final Integer epoch; public CandidateState(Integer epoch) { this.epoch = epoch; @@ -224,13 +241,13 @@ public CandidateState(Integer epoch) { public RoleState transform(StateMachineContext context) { RoleState.randomPark(context); int epoch = this.epoch == null ? 1 : this.epoch; - MetaData metaData = new MetaData(context.config().node(), epoch); + RoleStateData stateData = new RoleStateData(context.config().node(), epoch); //failover to master success - context.epoch(metaData.epoch()); - if (context.adapter().postDelyIfPresent(metaData, -1)) { - return new MasterState(metaData); + context.epoch(stateData.epoch()); + if (context.adapter().delayIfNodePresent(stateData, -1)) { + return new MasterState(stateData); } else { - return new WorkerState(metaData); + return new WorkerState(stateData); } } @@ -267,7 +284,7 @@ public void epoch(Integer epoch) { } @Override - public MetaDataAdapter adapter() { + public RoleStataDataAdapter adapter() { return this.machine.adapter(); } @@ -287,7 +304,7 @@ public void reset() { } } - protected MetaDataAdapter adapter() { - return this.metaDataAdapter; + protected RoleStataDataAdapter adapter() { + return this.roleStataDataAdapter; } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStataDataAdapter.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStataDataAdapter.java new file mode 100644 index 0000000000..eabd17c444 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStataDataAdapter.java @@ -0,0 +1,31 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.election; + +import java.util.Optional; + +public interface RoleStataDataAdapter { + + boolean delayIfNodePresent(RoleStateData metaData, long delaySecond); + + Optional queryWithDelay(long delaySecond); + + Optional query(); +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStateData.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStateData.java new file mode 100644 index 0000000000..db728d00c6 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStateData.java @@ -0,0 +1,91 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.election; + +import java.util.Objects; + +public class RoleStateData { + + private String node; + private long count; + private int epoch; + + public RoleStateData(String node, int epoch) { + this(node, epoch, 1); + } + + public RoleStateData(String node, int epoch, long count) { + this.node = node; + this.epoch = epoch; + this.count = count; + } + + public void increaseCount() { + this.count++; + } + + public boolean isMaster(String node) { + return Objects.equals(this.node, node); + } + + public int epoch() { + return this.epoch; + } + + public long count() { + return this.count; + } + + public void count(long count) { + this.count = count; + } + + public String node() { + return this.node; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof RoleStateData)) { + return false; + } + RoleStateData metaData = (RoleStateData) o; + return count == metaData.count && + epoch == metaData.epoch && + Objects.equals(node, metaData.node); + } + + @Override + public int hashCode() { + return Objects.hash(node, count, epoch); + } + + @Override + public String toString() { + return "RoleStateData{" + + "node='" + node + '\'' + + ", count=" + count + + ", epoch=" + epoch + + '}'; + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java index b765c455c9..9b6b932a87 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java @@ -1,3 +1,22 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + package com.baidu.hugegraph.election; public interface StateMachineCallback { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java index 74b37f337d..69251cba6f 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java @@ -1,3 +1,22 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + package com.baidu.hugegraph.election; public interface StateMachineContext { @@ -12,7 +31,7 @@ public interface StateMachineContext { Config config(); - MetaDataAdapter adapter(); + RoleStataDataAdapter adapter(); void reset(); } diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java index 177aeb9d0e..f3214f569b 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java @@ -1,3 +1,22 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + package com.baidu.hugegraph.core; import java.util.ArrayList; @@ -17,8 +36,8 @@ import org.junit.Test; import com.baidu.hugegraph.election.Config; -import com.baidu.hugegraph.election.MetaData; -import com.baidu.hugegraph.election.MetaDataAdapter; +import com.baidu.hugegraph.election.RoleStateData; +import com.baidu.hugegraph.election.RoleStataDataAdapter; import com.baidu.hugegraph.election.RoleElectionStateMachine; import com.baidu.hugegraph.election.RoleElectionStateMachineImpl; import com.baidu.hugegraph.election.StateMachineCallback; @@ -29,6 +48,7 @@ public class RoleElectionStateMachineTest { public static class LogEntry { Integer epoch; + String node; Role role; @@ -175,28 +195,31 @@ public void error(StateMachineContext context, Throwable e) { } }; - final List metaDataLogs = Collections.synchronizedList(new ArrayList<>(100)); - final MetaDataAdapter adapter = new MetaDataAdapter() { + final List metaDataLogs = Collections.synchronizedList(new ArrayList<>(100)); + final RoleStataDataAdapter adapter = new RoleStataDataAdapter() { + volatile int epoch = 0; - Map data = new ConcurrentHashMap<>(); - MetaData copy(MetaData metaData) { - if (metaData == null) { + final Map data = new ConcurrentHashMap<>(); + + RoleStateData copy(RoleStateData stateData) { + if (stateData == null) { return null; } - return new MetaData(metaData.node(), metaData.epoch(), metaData.count()); + return new RoleStateData(stateData.node(), stateData.epoch(), stateData.count()); } + @Override - public boolean postDelyIfPresent(MetaData metaData, long delySecond) { - if (delySecond > 0) { - LockSupport.parkNanos(delySecond * 1_000_000_000); + public boolean delayIfNodePresent(RoleStateData stateData, long delaySecond) { + if (delaySecond > 0) { + LockSupport.parkNanos(delaySecond * 1_000_000_000); } - if (metaData.epoch() < this.epoch) { + if (stateData.epoch() < this.epoch) { return false; } - MetaData copy = this.copy(metaData); - MetaData newData = data.compute(copy.epoch(), (key, value) -> { + RoleStateData copy = this.copy(stateData); + RoleStateData newData = data.compute(copy.epoch(), (key, value) -> { if (copy.epoch() > this.epoch) { this.epoch = copy.epoch(); Assert.assertNull(value); @@ -223,13 +246,13 @@ public boolean postDelyIfPresent(MetaData metaData, long delySecond) { } @Override - public Optional queryDelay(long delySecond) { - LockSupport.parkNanos(delySecond * 1_000_000_000); + public Optional queryWithDelay(long delaySecond) { + LockSupport.parkNanos(delaySecond * 1_000_000_000); return Optional.ofNullable(this.copy(this.data.get(this.epoch))); } @Override - public Optional query() { + public Optional query() { return Optional.ofNullable(this.copy(this.data.get(this.epoch))); } }; From f3c5ab63c8face150c71238447db5b8ea5b8d53d Mon Sep 17 00:00:00 2001 From: zyxxoo <1318247699@qq.com> Date: Wed, 17 Aug 2022 09:40:45 +0800 Subject: [PATCH 6/9] improve code --- .../RoleElectionStateMachineImpl.java | 42 +++++++++--------- .../hugegraph/election/RoleStateData.java | 32 +++++++------- ...aAdapter.java => RoleTypeDataAdapter.java} | 6 +-- .../election/StateMachineCallback.java | 2 +- .../election/StateMachineContext.java | 2 +- .../core/RoleElectionStateMachineTest.java | 44 ++++++++----------- 6 files changed, 60 insertions(+), 68 deletions(-) rename hugegraph-core/src/main/java/com/baidu/hugegraph/election/{RoleStataDataAdapter.java => RoleTypeDataAdapter.java} (84%) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java index ffb535d233..d5546d10ce 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java @@ -27,15 +27,16 @@ public class RoleElectionStateMachineImpl implements RoleElectionStateMachine { - private volatile boolean shutdown = false; + private volatile boolean shutdown; private final Config config; private volatile RoleState state; - private final RoleStataDataAdapter roleStataDataAdapter; + private final RoleTypeDataAdapter roleTypeDataAdapter; - public RoleElectionStateMachineImpl(Config config, RoleStataDataAdapter adapter) { + public RoleElectionStateMachineImpl(Config config, RoleTypeDataAdapter adapter) { this.config = config; - this.roleStataDataAdapter = adapter; + this.roleTypeDataAdapter = adapter; this.state = new UnKnownState(null); + this.shutdown = false; } @Override @@ -58,7 +59,7 @@ public void apply(StateMachineCallback stateMachineCallback) { stateMachineCallback.error(context, e); failCount ++; if (failCount >= this.config.exceedsFailCount()) { - this.state = new SafeState(context.epoch()); + this.state = new AbdicationState(context.epoch()); Callback runnable = this.state.callback(stateMachineCallback); runnable.call(context); } @@ -103,7 +104,7 @@ public UnKnownState(Integer epoch) { @Override public RoleState transform(StateMachineContext context) { - RoleStataDataAdapter adapter = context.adapter(); + RoleTypeDataAdapter adapter = context.adapter(); Optional stateDataOpt = adapter.query(); if (!stateDataOpt.isPresent()) { context.reset(); @@ -134,11 +135,11 @@ public Callback callback(StateMachineCallback callback) { } } - private static class SafeState implements RoleState { + private static class AbdicationState implements RoleState { private final Integer epoch; - public SafeState(Integer epoch) { + public AbdicationState(Integer epoch) { this.epoch = epoch; } @@ -150,7 +151,7 @@ public RoleState transform(StateMachineContext context) { @Override public Callback callback(StateMachineCallback callback) { - return callback::safe; + return callback::abdication; } } @@ -164,9 +165,9 @@ public MasterState(RoleStateData stateData) { @Override public RoleState transform(StateMachineContext context) { - this.stateData.increaseCount(); + this.stateData.increaseClock(); RoleState.heartBeatPark(context); - if (context.adapter().delayIfNodePresent(this.stateData, -1)) { + if (context.adapter().updateIfNodePresent(this.stateData)) { return this; } context.reset(); @@ -183,10 +184,11 @@ public Callback callback(StateMachineCallback callback) { private static class WorkerState implements RoleState { private RoleStateData stateData; - private int count = 0; + private int count; public WorkerState(RoleStateData stateData) { this.stateData = stateData; + this.count = 0; } @Override @@ -217,10 +219,10 @@ public void merge(WorkerState state) { } else if (state.stateData.epoch() < this.stateData.epoch()){ throw new IllegalStateException("Epoch must increase"); } else if (state.stateData.epoch() == this.stateData.epoch() && - state.stateData.count() < this.stateData.count()) { - throw new IllegalStateException("Meta count must increase"); + state.stateData.clock() < this.stateData.clock()) { + throw new IllegalStateException("Clock must increase"); } else if (state.stateData.epoch() == this.stateData.epoch() && - state.stateData.count() > this.stateData.count()) { + state.stateData.clock() > this.stateData.clock()) { this.count = 0; this.stateData = state.stateData; } else { @@ -244,10 +246,10 @@ public RoleState transform(StateMachineContext context) { RoleStateData stateData = new RoleStateData(context.config().node(), epoch); //failover to master success context.epoch(stateData.epoch()); - if (context.adapter().delayIfNodePresent(stateData, -1)) { + if (context.adapter().updateIfNodePresent(stateData)) { return new MasterState(stateData); } else { - return new WorkerState(stateData); + return new UnKnownState(epoch).transform(context); } } @@ -284,7 +286,7 @@ public void epoch(Integer epoch) { } @Override - public RoleStataDataAdapter adapter() { + public RoleTypeDataAdapter adapter() { return this.machine.adapter(); } @@ -304,7 +306,7 @@ public void reset() { } } - protected RoleStataDataAdapter adapter() { - return this.roleStataDataAdapter; + protected RoleTypeDataAdapter adapter() { + return this.roleTypeDataAdapter; } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStateData.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStateData.java index db728d00c6..53112f020a 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStateData.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStateData.java @@ -24,21 +24,21 @@ public class RoleStateData { private String node; - private long count; + private long clock; private int epoch; public RoleStateData(String node, int epoch) { this(node, epoch, 1); } - public RoleStateData(String node, int epoch, long count) { + public RoleStateData(String node, int epoch, long clock) { this.node = node; this.epoch = epoch; - this.count = count; + this.clock = clock; } - public void increaseCount() { - this.count++; + public void increaseClock() { + this.clock++; } public boolean isMaster(String node) { @@ -49,12 +49,12 @@ public int epoch() { return this.epoch; } - public long count() { - return this.count; + public long clock() { + return this.clock; } - public void count(long count) { - this.count = count; + public void clock(long clock) { + this.clock = clock; } public String node() { @@ -62,29 +62,29 @@ public String node() { } @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object obj) { + if (this == obj) { return true; } - if (!(o instanceof RoleStateData)) { + if (!(obj instanceof RoleStateData)) { return false; } - RoleStateData metaData = (RoleStateData) o; - return count == metaData.count && + RoleStateData metaData = (RoleStateData) obj; + return clock == metaData.clock && epoch == metaData.epoch && Objects.equals(node, metaData.node); } @Override public int hashCode() { - return Objects.hash(node, count, epoch); + return Objects.hash(node, clock, epoch); } @Override public String toString() { return "RoleStateData{" + "node='" + node + '\'' + - ", count=" + count + + ", clock=" + clock + ", epoch=" + epoch + '}'; } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStataDataAdapter.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeDataAdapter.java similarity index 84% rename from hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStataDataAdapter.java rename to hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeDataAdapter.java index eabd17c444..590902f218 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStataDataAdapter.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeDataAdapter.java @@ -21,11 +21,9 @@ import java.util.Optional; -public interface RoleStataDataAdapter { +public interface RoleTypeDataAdapter { - boolean delayIfNodePresent(RoleStateData metaData, long delaySecond); - - Optional queryWithDelay(long delaySecond); + boolean updateIfNodePresent(RoleStateData stateData); Optional query(); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java index 9b6b932a87..8403b59502 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineCallback.java @@ -29,7 +29,7 @@ public interface StateMachineCallback { void unknown(StateMachineContext context); - void safe(StateMachineContext context); + void abdication(StateMachineContext context); void error(StateMachineContext context, Throwable e); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java index 69251cba6f..a3693f5fac 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/StateMachineContext.java @@ -31,7 +31,7 @@ public interface StateMachineContext { Config config(); - RoleStataDataAdapter adapter(); + RoleTypeDataAdapter adapter(); void reset(); } diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java index f3214f569b..e278717827 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java @@ -32,16 +32,16 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.LockSupport; -import org.junit.Assert; import org.junit.Test; import com.baidu.hugegraph.election.Config; import com.baidu.hugegraph.election.RoleStateData; -import com.baidu.hugegraph.election.RoleStataDataAdapter; +import com.baidu.hugegraph.election.RoleTypeDataAdapter; import com.baidu.hugegraph.election.RoleElectionStateMachine; import com.baidu.hugegraph.election.RoleElectionStateMachineImpl; import com.baidu.hugegraph.election.StateMachineCallback; import com.baidu.hugegraph.election.StateMachineContext; +import com.baidu.hugegraph.testutil.Assert; public class RoleElectionStateMachineTest { @@ -57,7 +57,7 @@ enum Role { master, worker, candidate, - safe, + abdication, unknown } @@ -68,11 +68,12 @@ public LogEntry(Integer epoch, String node, Role role) { } @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof LogEntry)) return false; - LogEntry logEntry = (LogEntry) o; - return Objects.equals(epoch, logEntry.epoch) && Objects.equals(node, logEntry.node) && role == logEntry.role; + public boolean equals(Object obj) { + if (this == obj) return true; + if (!(obj instanceof LogEntry)) return false; + LogEntry logEntry = (LogEntry) obj; + return Objects.equals(epoch, logEntry.epoch) && + Objects.equals(node, logEntry.node) && role == logEntry.role; } @Override @@ -180,10 +181,10 @@ public void unknown(StateMachineContext context) { } @Override - public void safe(StateMachineContext context) { + public void abdication(StateMachineContext context) { Integer epochId = context.epoch(); String node = context.node(); - logRecords.add(new LogEntry(epochId, node, LogEntry.Role.safe)); + logRecords.add(new LogEntry(epochId, node, LogEntry.Role.abdication)); if (logRecords.size() > MAX_COUNT) { context.stateMachine().shutdown(); } @@ -196,7 +197,7 @@ public void error(StateMachineContext context, Throwable e) { }; final List metaDataLogs = Collections.synchronizedList(new ArrayList<>(100)); - final RoleStataDataAdapter adapter = new RoleStataDataAdapter() { + final RoleTypeDataAdapter adapter = new RoleTypeDataAdapter() { volatile int epoch = 0; @@ -206,14 +207,11 @@ RoleStateData copy(RoleStateData stateData) { if (stateData == null) { return null; } - return new RoleStateData(stateData.node(), stateData.epoch(), stateData.count()); + return new RoleStateData(stateData.node(), stateData.epoch(), stateData.clock()); } @Override - public boolean delayIfNodePresent(RoleStateData stateData, long delaySecond) { - if (delaySecond > 0) { - LockSupport.parkNanos(delaySecond * 1_000_000_000); - } + public boolean updateIfNodePresent(RoleStateData stateData) { if (stateData.epoch() < this.epoch) { return false; } @@ -230,10 +228,10 @@ public boolean delayIfNodePresent(RoleStateData stateData, long delaySecond) { Assert.assertEquals(value.epoch(), copy.epoch()); if (Objects.equals(value.node(), copy.node()) && - value.count() <= copy.count()) { + value.clock() <= copy.clock()) { System.out.println("----2" + copy); metaDataLogs.add(copy); - if (value.count() == copy.count()) { + if (value.clock() == copy.clock()) { Exception e = new Exception("eq"); e.printStackTrace(); } @@ -245,12 +243,6 @@ public boolean delayIfNodePresent(RoleStateData stateData, long delaySecond) { return Objects.equals(newData, copy); } - @Override - public Optional queryWithDelay(long delaySecond) { - LockSupport.parkNanos(delaySecond * 1_000_000_000); - return Optional.ofNullable(this.copy(this.data.get(this.epoch))); - } - @Override public Optional query() { return Optional.ofNullable(this.copy(this.data.get(this.epoch))); @@ -308,7 +300,7 @@ public Optional query() { randomShutdown.start(); stop.await(); - Assert.assertTrue(logRecords.size() > 0); + Assert.assertGt(0, logRecords.size()); Map masters = new HashMap<>(); for (LogEntry entry: logRecords) { if (entry.role == LogEntry.Role.master) { @@ -319,6 +311,6 @@ public Optional query() { } } - Assert.assertTrue(masters.size() > 0); + Assert.assertGt(0, masters.size()); } } From 664c614370bcaacf77e78d9075ead35478addd9f Mon Sep 17 00:00:00 2001 From: zyxxoo <1318247699@qq.com> Date: Wed, 17 Aug 2022 11:14:57 +0800 Subject: [PATCH 7/9] improve code --- .../RoleElectionStateMachineImpl.java | 76 +++++++++---------- .../{RoleStateData.java => RoleTypeData.java} | 10 +-- .../election/RoleTypeDataAdapter.java | 4 +- .../core/RoleElectionStateMachineTest.java | 26 ++++--- 4 files changed, 60 insertions(+), 56 deletions(-) rename hugegraph-core/src/main/java/com/baidu/hugegraph/election/{RoleStateData.java => RoleTypeData.java} (89%) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java index d5546d10ce..f670b01001 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java @@ -105,27 +105,27 @@ public UnKnownState(Integer epoch) { @Override public RoleState transform(StateMachineContext context) { RoleTypeDataAdapter adapter = context.adapter(); - Optional stateDataOpt = adapter.query(); - if (!stateDataOpt.isPresent()) { + Optional roleTypeDataOpt = adapter.query(); + if (!roleTypeDataOpt.isPresent()) { context.reset(); Integer nextEpoch = this.epoch == null ? 1 : this.epoch + 1; context.epoch(nextEpoch); return new CandidateState(nextEpoch); } - RoleStateData stateData = stateDataOpt.get(); - if (this.epoch != null && stateData.epoch() < this.epoch) { + RoleTypeData roleTypeData = roleTypeDataOpt.get(); + if (this.epoch != null && roleTypeData.epoch() < this.epoch) { context.reset(); Integer nextEpoch = this.epoch + 1; context.epoch(nextEpoch); return new CandidateState(nextEpoch); } - context.epoch(stateData.epoch()); - if (stateData.isMaster(context.node())) { - return new MasterState(stateData); + context.epoch(roleTypeData.epoch()); + if (roleTypeData.isMaster(context.node())) { + return new MasterState(roleTypeData); } else { - return new WorkerState(stateData); + return new WorkerState(roleTypeData); } } @@ -157,22 +157,22 @@ public Callback callback(StateMachineCallback callback) { private static class MasterState implements RoleState { - private final RoleStateData stateData; + private final RoleTypeData roleTypeData; - public MasterState(RoleStateData stateData) { - this.stateData = stateData; + public MasterState(RoleTypeData roleTypeData) { + this.roleTypeData = roleTypeData; } @Override public RoleState transform(StateMachineContext context) { - this.stateData.increaseClock(); + this.roleTypeData.increaseClock(); RoleState.heartBeatPark(context); - if (context.adapter().updateIfNodePresent(this.stateData)) { + if (context.adapter().updateIfNodePresent(this.roleTypeData)) { return this; } context.reset(); - context.epoch(this.stateData.epoch()); - return new UnKnownState(this.stateData.epoch()).transform(context); + context.epoch(this.roleTypeData.epoch()); + return new UnKnownState(this.roleTypeData.epoch()).transform(context); } @Override @@ -183,22 +183,22 @@ public Callback callback(StateMachineCallback callback) { private static class WorkerState implements RoleState { - private RoleStateData stateData; - private int count; + private RoleTypeData roleTypeData; + private int clock; - public WorkerState(RoleStateData stateData) { - this.stateData = stateData; - this.count = 0; + public WorkerState(RoleTypeData roleTypeData) { + this.roleTypeData = roleTypeData; + this.clock = 0; } @Override public RoleState transform(StateMachineContext context) { RoleState.heartBeatPark(context); - RoleState nextState = new UnKnownState(this.stateData.epoch()).transform(context); + RoleState nextState = new UnKnownState(this.roleTypeData.epoch()).transform(context); if (nextState instanceof WorkerState) { this.merge((WorkerState) nextState); - if (this.count > context.config().exceedsWorkerCount()) { - return new CandidateState(this.stateData.epoch() + 1); + if (this.clock > context.config().exceedsWorkerCount()) { + return new CandidateState(this.roleTypeData.epoch() + 1); } else { return this; } @@ -213,20 +213,20 @@ public Callback callback(StateMachineCallback callback) { } public void merge(WorkerState state) { - if (state.stateData.epoch() > this.stateData.epoch()) { - this.count = 0; - this.stateData = state.stateData; - } else if (state.stateData.epoch() < this.stateData.epoch()){ + if (state.roleTypeData.epoch() > this.roleTypeData.epoch()) { + this.clock = 0; + this.roleTypeData = state.roleTypeData; + } else if (state.roleTypeData.epoch() < this.roleTypeData.epoch()){ throw new IllegalStateException("Epoch must increase"); - } else if (state.stateData.epoch() == this.stateData.epoch() && - state.stateData.clock() < this.stateData.clock()) { + } else if (state.roleTypeData.epoch() == this.roleTypeData.epoch() && + state.roleTypeData.clock() < this.roleTypeData.clock()) { throw new IllegalStateException("Clock must increase"); - } else if (state.stateData.epoch() == this.stateData.epoch() && - state.stateData.clock() > this.stateData.clock()) { - this.count = 0; - this.stateData = state.stateData; + } else if (state.roleTypeData.epoch() == this.roleTypeData.epoch() && + state.roleTypeData.clock() > this.roleTypeData.clock()) { + this.clock = 0; + this.roleTypeData = state.roleTypeData; } else { - this.count++; + this.clock++; } } } @@ -243,11 +243,11 @@ public CandidateState(Integer epoch) { public RoleState transform(StateMachineContext context) { RoleState.randomPark(context); int epoch = this.epoch == null ? 1 : this.epoch; - RoleStateData stateData = new RoleStateData(context.config().node(), epoch); + RoleTypeData roleTypeData = new RoleTypeData(context.config().node(), epoch); //failover to master success - context.epoch(stateData.epoch()); - if (context.adapter().updateIfNodePresent(stateData)) { - return new MasterState(stateData); + context.epoch(roleTypeData.epoch()); + if (context.adapter().updateIfNodePresent(roleTypeData)) { + return new MasterState(roleTypeData); } else { return new UnKnownState(epoch).transform(context); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStateData.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeData.java similarity index 89% rename from hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStateData.java rename to hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeData.java index 53112f020a..aa60c47ace 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleStateData.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeData.java @@ -21,17 +21,17 @@ import java.util.Objects; -public class RoleStateData { +public class RoleTypeData { private String node; private long clock; private int epoch; - public RoleStateData(String node, int epoch) { + public RoleTypeData(String node, int epoch) { this(node, epoch, 1); } - public RoleStateData(String node, int epoch, long clock) { + public RoleTypeData(String node, int epoch, long clock) { this.node = node; this.epoch = epoch; this.clock = clock; @@ -66,10 +66,10 @@ public boolean equals(Object obj) { if (this == obj) { return true; } - if (!(obj instanceof RoleStateData)) { + if (!(obj instanceof RoleTypeData)) { return false; } - RoleStateData metaData = (RoleStateData) obj; + RoleTypeData metaData = (RoleTypeData) obj; return clock == metaData.clock && epoch == metaData.epoch && Objects.equals(node, metaData.node); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeDataAdapter.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeDataAdapter.java index 590902f218..41a2021f09 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeDataAdapter.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleTypeDataAdapter.java @@ -23,7 +23,7 @@ public interface RoleTypeDataAdapter { - boolean updateIfNodePresent(RoleStateData stateData); + boolean updateIfNodePresent(RoleTypeData stateData); - Optional query(); + Optional query(); } diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java index e278717827..fc0ad48c28 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java @@ -35,7 +35,7 @@ import org.junit.Test; import com.baidu.hugegraph.election.Config; -import com.baidu.hugegraph.election.RoleStateData; +import com.baidu.hugegraph.election.RoleTypeData; import com.baidu.hugegraph.election.RoleTypeDataAdapter; import com.baidu.hugegraph.election.RoleElectionStateMachine; import com.baidu.hugegraph.election.RoleElectionStateMachineImpl; @@ -69,8 +69,12 @@ public LogEntry(Integer epoch, String node, Role role) { @Override public boolean equals(Object obj) { - if (this == obj) return true; - if (!(obj instanceof LogEntry)) return false; + if (this == obj) { + return true; + } + if (!(obj instanceof LogEntry)) { + return false; + } LogEntry logEntry = (LogEntry) obj; return Objects.equals(epoch, logEntry.epoch) && Objects.equals(node, logEntry.node) && role == logEntry.role; @@ -196,28 +200,28 @@ public void error(StateMachineContext context, Throwable e) { } }; - final List metaDataLogs = Collections.synchronizedList(new ArrayList<>(100)); + final List metaDataLogs = Collections.synchronizedList(new ArrayList<>(100)); final RoleTypeDataAdapter adapter = new RoleTypeDataAdapter() { volatile int epoch = 0; - final Map data = new ConcurrentHashMap<>(); + final Map data = new ConcurrentHashMap<>(); - RoleStateData copy(RoleStateData stateData) { + RoleTypeData copy(RoleTypeData stateData) { if (stateData == null) { return null; } - return new RoleStateData(stateData.node(), stateData.epoch(), stateData.clock()); + return new RoleTypeData(stateData.node(), stateData.epoch(), stateData.clock()); } @Override - public boolean updateIfNodePresent(RoleStateData stateData) { + public boolean updateIfNodePresent(RoleTypeData stateData) { if (stateData.epoch() < this.epoch) { return false; } - RoleStateData copy = this.copy(stateData); - RoleStateData newData = data.compute(copy.epoch(), (key, value) -> { + RoleTypeData copy = this.copy(stateData); + RoleTypeData newData = data.compute(copy.epoch(), (key, value) -> { if (copy.epoch() > this.epoch) { this.epoch = copy.epoch(); Assert.assertNull(value); @@ -244,7 +248,7 @@ public boolean updateIfNodePresent(RoleStateData stateData) { } @Override - public Optional query() { + public Optional query() { return Optional.ofNullable(this.copy(this.data.get(this.epoch))); } }; From 048e782dd5aba51e04ff3868b9a4d53503ea07dd Mon Sep 17 00:00:00 2001 From: zyxxoo <1318247699@qq.com> Date: Wed, 17 Aug 2022 12:50:47 +0800 Subject: [PATCH 8/9] improve code --- .../election/RoleElectionStateMachineImpl.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java index f670b01001..016f51cb8d 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/election/RoleElectionStateMachineImpl.java @@ -35,7 +35,7 @@ public class RoleElectionStateMachineImpl implements RoleElectionStateMachine { public RoleElectionStateMachineImpl(Config config, RoleTypeDataAdapter adapter) { this.config = config; this.roleTypeDataAdapter = adapter; - this.state = new UnKnownState(null); + this.state = new UnknownState(null); this.shutdown = false; } @@ -94,11 +94,11 @@ private interface Callback { void call(StateMachineContext context); } - private static class UnKnownState implements RoleState { + private static class UnknownState implements RoleState { final Integer epoch; - public UnKnownState(Integer epoch) { + public UnknownState(Integer epoch) { this.epoch = epoch; } @@ -146,7 +146,7 @@ public AbdicationState(Integer epoch) { @Override public RoleState transform(StateMachineContext context) { RoleState.heartBeatPark(context); - return new UnKnownState(this.epoch).transform(context); + return new UnknownState(this.epoch).transform(context); } @Override @@ -172,7 +172,7 @@ public RoleState transform(StateMachineContext context) { } context.reset(); context.epoch(this.roleTypeData.epoch()); - return new UnKnownState(this.roleTypeData.epoch()).transform(context); + return new UnknownState(this.roleTypeData.epoch()).transform(context); } @Override @@ -194,7 +194,7 @@ public WorkerState(RoleTypeData roleTypeData) { @Override public RoleState transform(StateMachineContext context) { RoleState.heartBeatPark(context); - RoleState nextState = new UnKnownState(this.roleTypeData.epoch()).transform(context); + RoleState nextState = new UnknownState(this.roleTypeData.epoch()).transform(context); if (nextState instanceof WorkerState) { this.merge((WorkerState) nextState); if (this.clock > context.config().exceedsWorkerCount()) { @@ -249,7 +249,7 @@ public RoleState transform(StateMachineContext context) { if (context.adapter().updateIfNodePresent(roleTypeData)) { return new MasterState(roleTypeData); } else { - return new UnKnownState(epoch).transform(context); + return new UnknownState(epoch).transform(context); } } From 67a5f93b56629a632392dd122e3abf5521080a31 Mon Sep 17 00:00:00 2001 From: zyxxoo <1318247699@qq.com> Date: Tue, 30 Aug 2022 14:35:47 +0800 Subject: [PATCH 9/9] improve code --- .../core/RoleElectionStateMachineTest.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java index fc0ad48c28..6e8af972dc 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/RoleElectionStateMachineTest.java @@ -32,16 +32,15 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.LockSupport; -import org.junit.Test; - import com.baidu.hugegraph.election.Config; -import com.baidu.hugegraph.election.RoleTypeData; -import com.baidu.hugegraph.election.RoleTypeDataAdapter; import com.baidu.hugegraph.election.RoleElectionStateMachine; import com.baidu.hugegraph.election.RoleElectionStateMachineImpl; +import com.baidu.hugegraph.election.RoleTypeData; +import com.baidu.hugegraph.election.RoleTypeDataAdapter; import com.baidu.hugegraph.election.StateMachineCallback; import com.baidu.hugegraph.election.StateMachineContext; import com.baidu.hugegraph.testutil.Assert; +import org.junit.Test; public class RoleElectionStateMachineTest { @@ -150,7 +149,7 @@ public void master(StateMachineContext context) { if (logRecords.size() > MAX_COUNT) { context.stateMachine().shutdown(); } - System.out.println("----master " + node); + System.out.println("master node: " + node); masterNodes.add(node); } @@ -196,7 +195,7 @@ public void abdication(StateMachineContext context) { @Override public void error(StateMachineContext context, Throwable e) { - System.out.println("----" + context.node() + " " + e.getMessage()); + System.out.println("state machine error: node " + context.node() + " message " + e.getMessage()); } }; @@ -226,18 +225,17 @@ public boolean updateIfNodePresent(RoleTypeData stateData) { this.epoch = copy.epoch(); Assert.assertNull(value); metaDataLogs.add(copy); - System.out.println("----1" + copy); + System.out.println("The node " + copy + " become new master:"); return copy; } Assert.assertEquals(value.epoch(), copy.epoch()); if (Objects.equals(value.node(), copy.node()) && value.clock() <= copy.clock()) { - System.out.println("----2" + copy); + System.out.println("The master node " + copy + " keep heartbeat"); metaDataLogs.add(copy); if (value.clock() == copy.clock()) { - Exception e = new Exception("eq"); - e.printStackTrace(); + Assert.fail("Clock must increase when same epoch and node id"); } return copy; }