diff --git a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/JsonGenerator.java b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/JsonGenerator.java index 19f919e5afe..26f8a3494b6 100644 --- a/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/JsonGenerator.java +++ b/zookeeper-contrib/zookeeper-contrib-loggraph/src/main/java/org/apache/zookeeper/graph/JsonGenerator.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.zookeeper.server.util.ZxidUtils; import java.util.regex.Pattern; import java.util.regex.Matcher; import java.util.HashSet; @@ -116,8 +117,8 @@ public JsonGenerator(LogIterator iter) { else if ((m = newElectionP.matcher(e.getEntry())).find()) { Iterator iterator = servers.iterator(); long zxid = Long.valueOf(m.group(2)); - int count = (int)zxid;// & 0xFFFFFFFFL; - int epoch = (int)Long.rotateRight(zxid, 32);// >> 32; + int count = ZxidUtils.getCounterFromZxid(zxid); + int epoch = ZxidUtils.getEpochFromZxid(zxid); if (leader != 0 && epoch > curEpoch) { JsonNode stateChange = add("stateChange", e.getTimestamp(), leader, "INIT"); @@ -149,8 +150,8 @@ else if ((m = receivedProposalP.matcher(e.getEntry())).find()) { int dst = e.getNode(); long epoch2 = Long.valueOf(m.group(3)); - int count = (int)zxid;// & 0xFFFFFFFFL; - int epoch = (int)Long.rotateRight(zxid, 32);// >> 32; + int count = ZxidUtils.getCounterFromZxid(zxid); + int epoch = ZxidUtils.getEpochFromZxid(zxid); if (leader != 0 && epoch > curEpoch) { JsonNode stateChange = add("stateChange", e.getTimestamp(), leader, "INIT"); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java index 5af114dccd1..aa065af17ff 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java @@ -25,6 +25,7 @@ import org.apache.jute.Record; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.metrics.MetricsContext; +import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.server.ExitCode; import org.apache.zookeeper.server.FinalRequestProcessor; import org.apache.zookeeper.server.Request; @@ -200,7 +201,7 @@ protected void unregisterMetrics() { private Request buildRequestToProcess(final TxnHeader hdr, final Record txn, final TxnDigest digest) { final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); request.setTxnDigest(digest); - if ((request.zxid & 0xffffffffL) != 0) { + if (ZxidUtils.getCounterFromZxid(request.zxid) != 0) { pendingTxns.add(request); } return request; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java index 0b57bb1824a..0ba6c63ddca 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java @@ -335,6 +335,10 @@ public Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException { this.self = self; this.proposalStats = new BufferStats(); + // A new Leader node has been elected, therefore the zxid bit length is synchronously increased. + ZxidUtils.setEpochHighPosition40(); + self.setCurrentEpochPosition(40); + Set addresses; if (self.getQuorumListenOnAllIPs()) { addresses = self.getQuorumAddress().getWildcardAddresses(); @@ -662,7 +666,7 @@ void lead() throws IOException, InterruptedException { newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null); - if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) { + if (ZxidUtils.getCounterFromZxid(newLeaderProposal.packet.getZxid()) != 0) { LOG.info("NEWLEADER proposal has Zxid of {}", Long.toHexString(newLeaderProposal.packet.getZxid())); } @@ -743,7 +747,7 @@ void lead() throws IOException, InterruptedException { /** * WARNING: do not use this for anything other than QA testing * on a real cluster. Specifically to enable verification that quorum - * can handle the lower 32bit roll-over issue identified in + * can handle the lower 40bit roll-over issue identified in * ZOOKEEPER-1277. Without this option it would take a very long * time (on order of a month say) to see the 4 billion writes * necessary to cause the roll-over to occur. @@ -755,7 +759,7 @@ void lead() throws IOException, InterruptedException { String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid"); if (initialZxid != null) { long zxid = Long.parseLong(initialZxid); - zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid); + zk.setZxid(ZxidUtils.clearCounter(zk.getZxid()) | zxid); } if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) { @@ -1058,7 +1062,7 @@ public synchronized void processAck(long sid, long zxid, SocketAddress followerA LOG.trace("outstanding proposals all"); } - if ((zxid & 0xffffffffL) == 0) { + if (ZxidUtils.getCounterFromZxid(zxid) == 0) { /* * We no longer process NEWLEADER ack with this method. However, * the learner sends an ack back to the leader after it gets @@ -1291,11 +1295,11 @@ public Proposal propose(Request request) throws XidRolloverException { ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue()); } /** - * Address the rollover issue. All lower 32bits set indicate a new leader + * Address the rollover issue. All lower 40bits set indicate a new leader * election. Force a re-election instead. See ZOOKEEPER-1277 */ - if ((request.zxid & 0xffffffffL) == 0xffffffffL) { - String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start"; + if (ZxidUtils.getCounterFromZxid(request.zxid) == ZxidUtils.getCounterLowPosition()) { + String msg = "zxid lower 40 bits have rolled over, forcing re-election, and therefore new epoch start"; shutdown(msg); throw new XidRolloverException(msg); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index e3bd13d1165..df7a32e4db4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -491,7 +491,7 @@ protected long registerWithLeader(int pktType) throws IOException { /* * Add sid to payload */ - LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion()); + LearnerInfo li = new LearnerInfo(self.getMyId(), 0x11000, self.getQuorumVerifier().getVersion()); ByteArrayOutputStream bsid = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); boa.writeRecord(li, "LearnerInfo"); @@ -499,10 +499,20 @@ protected long registerWithLeader(int pktType) throws IOException { writePacket(qp, true); readPacket(qp); - final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); + long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); if (qp.getType() == Leader.LEADERINFO) { // we are connected to a 1.0 server so accept the new epoch and read the next packet leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt(); + + if (leaderProtocolVersion >= 0x11000) { + /** + * The Leader is a new version with adjusted zxid bit length, + * thus Followers and Observers also adjust their zxid bit length accordingly. + */ + ZxidUtils.setEpochHighPosition40(); + self.setCurrentEpochPosition(40); + newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); + } byte[] epochBytes = new byte[4]; final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); if (newEpoch > self.getAcceptedEpoch()) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java index 049336a16ee..475c039407c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java @@ -525,15 +525,16 @@ public void run() { long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch); long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0); - if (this.getVersion() < 0x10000) { - // we are going to have to extrapolate the epoch information - long epoch = ZxidUtils.getEpochFromZxid(zxid); - ss = new StateSummary(epoch, zxid); - // fake the message - learnerMaster.waitForEpochAck(this.getSid(), ss); + /** + * When the Leader switches to a new mode + * lower version Observers or Followers are not allowed to join the cluster, thus the zxid bit length has changed. + */ + if (this.getVersion() < 0x11000) { + LOG.error("sid:{} version too old", this.sid); + return; } else { byte[] ver = new byte[4]; - ByteBuffer.wrap(ver).putInt(0x10000); + ByteBuffer.wrap(ver).putInt(0x11000); QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null); oa.writeRecord(newEpochPacket, "packet"); messageTracker.trackSent(Leader.LEADERINFO); @@ -597,13 +598,8 @@ public void run() { // the version of this quorumVerifier will be set by leader.lead() in case // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if // we got here, so the version was set - if (getVersion() < 0x10000) { - QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null); - oa.writeRecord(newLeaderQP, "packet"); - } else { - QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null); - queuedPackets.add(newLeaderQP); - } + QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null); + queuedPackets.add(newLeaderQP); bufferedOutput.flush(); // Start thread that blast packets in the queue to learner @@ -789,7 +785,7 @@ boolean syncFollower(long peerLastZxid, LearnerMaster learnerMaster) { * zxid in our history. In this case, we will ignore TRUNC logic and * always send DIFF if we have old enough history */ - boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0; + boolean isPeerNewEpochZxid = ZxidUtils.getCounterFromZxid(peerLastZxid) == 0; // Keep track of the latest zxid which already queued long currentZxid = peerLastZxid; boolean needSnap = true; @@ -949,7 +945,7 @@ boolean syncFollower(long peerLastZxid, LearnerMaster learnerMaster) { * @return last zxid of the queued proposal */ protected long queueCommittedProposals(Iterator itr, long peerLastZxid, Long maxZxid, Long lastCommittedZxid) { - boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0; + boolean isPeerNewEpochZxid = ZxidUtils.getCounterFromZxid(peerLastZxid) == 0; long queuedZxid = peerLastZxid; // as we look through proposals, this variable keeps track of previous // proposal Id. diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index aba7a294630..20762c591e0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -1209,6 +1209,25 @@ private void loadDataBase() { // load the epochs long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; + /** + * It is only after joining the cluster that we know whether the current use is 32-bit or 40-bit. + * Here, a new file is created locally to store the current number of epoch bits. This file will be created locally after upgrading the version to ensure a smooth joining of the cluster upon startup. + */ + try { + currentEpochPosition = readLongFromFile(CURRENT_EPOCH_POSITION_FILENAME); + } catch (FileNotFoundException e) { + // pick a reasonable epoch number + // this should only happen once when moving to a + // new code version + currentEpochPosition = ZxidUtils.getEpochHighPosition(); + LOG.info( + "{} not found! Creating with a reasonable default of {}. " + + "This should only happen when you are upgrading your installation", + CURRENT_EPOCH_POSITION_FILENAME, + currentEpochPosition); + writeLongToFile(CURRENT_EPOCH_POSITION_FILENAME, currentEpochPosition); + } + ZxidUtils.setEpochHighPosition(currentEpochPosition); long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); try { currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); @@ -2298,11 +2317,14 @@ private long readLongFromFile(String name) throws IOException { private long acceptedEpoch = -1; private long currentEpoch = -1; + private long currentEpochPosition = -1; public static final String CURRENT_EPOCH_FILENAME = "currentEpoch"; public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch"; + public static final String CURRENT_EPOCH_POSITION_FILENAME = "currentEpochPosintion"; + /** * Write a long value to disk atomically. Either succeeds or an exception * is thrown. @@ -2341,6 +2363,11 @@ public void setCurrentEpoch(long e) throws IOException { } + public void setCurrentEpochPosition(long e) throws IOException { + writeLongToFile(CURRENT_EPOCH_POSITION_FILENAME, e); + currentEpochPosition = e; + } + public void setAcceptedEpoch(long e) throws IOException { writeLongToFile(ACCEPTED_EPOCH_FILENAME, e); acceptedEpoch = e; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ZxidUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ZxidUtils.java index b3b6935d5c7..a59dd8443f1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ZxidUtils.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/ZxidUtils.java @@ -18,19 +18,83 @@ package org.apache.zookeeper.server.util; +import java.util.concurrent.atomic.AtomicLong; + public class ZxidUtils { + // 40L + private static final long EPOCH_HIGH_POSITION32 = 32L; + private static final long EPOCH_HIGH_POSITION40 = 40L; + + private static final long COUNTER_LOW_POSITION32 = 0xffffffffL; + private static final long CLEAR_EPOCH32 = 0x00000000ffffffffL; + private static final long CLEAR_COUNTER32 = 0xffffffff00000000L; + + private static final long COUNTER_LOW_POSITION40 = 0xffffffffffL; + private static final long CLEAR_EPOCH40 = 0x000000ffffffffffL; + private static final long CLEAR_COUNTER40 = 0xffffff0000000000L; + + private static AtomicLong EPOCH_HIGH_POSITION = new AtomicLong(EPOCH_HIGH_POSITION32); + + private static AtomicLong COUNTER_LOW_POSITION = new AtomicLong(COUNTER_LOW_POSITION32); + private static AtomicLong CLEAR_EPOCH = new AtomicLong(CLEAR_EPOCH32); + private static AtomicLong CLEAR_COUNTER = new AtomicLong(CLEAR_COUNTER32); + + static public long getEpochFromZxid(long zxid) { + return zxid >> EPOCH_HIGH_POSITION.get(); + } + + static public long getCounterFromZxid(long zxid) { + return zxid & COUNTER_LOW_POSITION.get(); + } - public static long getEpochFromZxid(long zxid) { - return zxid >> 32L; + static public long makeZxid(long epoch, long counter) { + return (epoch << EPOCH_HIGH_POSITION.get()) | (counter & COUNTER_LOW_POSITION.get()); } - public static long getCounterFromZxid(long zxid) { - return zxid & 0xffffffffL; + + static public long clearEpoch(long zxid) { + return zxid & CLEAR_EPOCH.get(); } - public static long makeZxid(long epoch, long counter) { - return (epoch << 32L) | (counter & 0xffffffffL); + + static public long clearCounter(long zxid) { + return zxid & CLEAR_COUNTER.get(); } - public static String zxidToString(long zxid) { + + static public String zxidToString(long zxid) { return Long.toHexString(zxid); } + public static long getEpochHighPosition() { + return EPOCH_HIGH_POSITION.get(); + } + + public static long getCounterLowPosition() { + return COUNTER_LOW_POSITION.get(); + } + + public static void setEpochHighPosition40() { + EPOCH_HIGH_POSITION.set(EPOCH_HIGH_POSITION40); + + COUNTER_LOW_POSITION.set(COUNTER_LOW_POSITION40); + CLEAR_EPOCH.set(CLEAR_EPOCH40); + CLEAR_COUNTER.set(CLEAR_COUNTER40); + } + + public static void setEpochHighPosition32() { + EPOCH_HIGH_POSITION.set(EPOCH_HIGH_POSITION32); + + COUNTER_LOW_POSITION.set(COUNTER_LOW_POSITION32); + CLEAR_EPOCH.set(CLEAR_EPOCH32); + CLEAR_COUNTER.set(CLEAR_COUNTER32); + } + + public static void setEpochHighPosition(long position) { + if (position == 32) { + setEpochHighPosition32(); + } else if (position == 40) { + setEpochHighPosition40(); + } else { + throw new IllegalArgumentException("Invalid epoch high position:" + position + ", should is 32 or 40."); + } + } + } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java index 031ccc2f7da..ae77af66cbf 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZxidRolloverTest.java @@ -28,6 +28,7 @@ import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.test.ClientBase; import org.apache.zookeeper.test.ClientBase.CountdownWatcher; import org.apache.zookeeper.test.ClientTest; @@ -210,7 +211,7 @@ private void shutdown(int idx) throws Exception { /** Reset the next zxid to be near epoch end */ private void adjustEpochNearEnd() { - zksLeader.setZxid((zksLeader.getZxid() & 0xffffffff00000000L) | 0xfffffffcL); + zksLeader.setZxid(ZxidUtils.clearCounter(zksLeader.getZxid()) | 0xfffffffffcL); } @AfterEach diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java index 43202716d2b..1470397d997 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java @@ -479,6 +479,60 @@ public void testNewEpochZxid() throws Exception { } + /** + * Test cases when learner has new-epcoh zxid + * (zxid & 0xffffffffffL) == 0; + */ + @Test + public void testNewEpochZxidWithTxnlogOnly() throws Exception { + long peerZxid; + db.txnLog.add(createProposal(getZxid(1, 1))); + db.txnLog.add(createProposal(getZxid(2, 1))); + db.txnLog.add(createProposal(getZxid(2, 2))); + db.txnLog.add(createProposal(getZxid(4, 1))); + + // After leader election, lastProcessedZxid will point to new epoch + db.lastProcessedZxid = getZxid(6, 0); + + // Peer has zxid of epoch 3 + peerZxid = getZxid(3, 0); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); + // We send DIFF to (6,0) and forward any packet starting at (4,1) + assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(4, 1)); + // DIFF + 1 proposals + 1 commit + assertEquals(3, learnerHandler.getQueuedPackets().size()); + queuedPacketMatches(new long[] { getZxid(4, 1)}); + reset(); + + // Peer has zxid of epoch 4 + peerZxid = getZxid(4, 0); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); + // We send DIFF to (6,0) and forward any packet starting at (4,1) + assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(4, 1)); + // DIFF + 1 proposals + 1 commit + assertEquals(3, learnerHandler.getQueuedPackets().size()); + queuedPacketMatches(new long[] { getZxid(4, 1)}); + reset(); + + // Peer has zxid of epoch 5 + peerZxid = getZxid(5, 0); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); + // We send DIFF to (6,0) and forward any packet starting at (5,0) + assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(5, 0)); + // DIFF only + assertEquals(1, learnerHandler.getQueuedPackets().size()); + reset(); + + // Peer has zxid of epoch 6 + peerZxid = getZxid(6, 0); + assertFalse(learnerHandler.syncFollower(peerZxid, leader)); + // We send DIFF to (6,0) and forward any packet starting at (6, 0) + assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(6, 0)); + // DIFF only + assertEquals(1, learnerHandler.getQueuedPackets().size()); + reset(); + } + /** * Test cases when there is a duplicate txn in the committedLog. This * should never happen unless there is a bug in initialization code diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java index 76a678f501c..fdb787a0190 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -516,7 +516,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long assertEquals(1, l.self.getCurrentEpoch()); /* we test a normal run. everything should work out well. */ - LearnerInfo li = new LearnerInfo(1, 0x10000, 0); + LearnerInfo li = new LearnerInfo(1, 0x11000, 0); byte[] liBytes = RequestRecord.fromRecord(li).readBytes(); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1, liBytes, null); oa.writeRecord(qp, null); @@ -524,7 +524,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l, long readPacketSkippingPing(ia, qp); assertEquals(Leader.LEADERINFO, qp.getType()); assertEquals(ZxidUtils.makeZxid(2, 0), qp.getZxid()); - assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); + assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x11000); assertEquals(2, l.self.getAcceptedEpoch()); assertEquals(1, l.self.getCurrentEpoch()); @@ -594,14 +594,14 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) assertEquals(qp.getZxid(), 0); LearnerInfo learnInfo = new LearnerInfo(); ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo); - assertEquals(learnInfo.getProtocolVersion(), 0x10000); + assertEquals(learnInfo.getProtocolVersion(), 0x11000); assertEquals(learnInfo.getServerid(), 0); // We are simulating an established leader, so the epoch is 1 qp.setType(Leader.LEADERINFO); qp.setZxid(ZxidUtils.makeZxid(1, 0)); byte[] protoBytes = new byte[4]; - ByteBuffer.wrap(protoBytes).putInt(0x10000); + ByteBuffer.wrap(protoBytes).putInt(0x11000); qp.setData(protoBytes); oa.writeRecord(qp, null); @@ -728,14 +728,14 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f) assertEquals(qp.getZxid(), 0); LearnerInfo learnInfo = new LearnerInfo(); ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo); - assertEquals(learnInfo.getProtocolVersion(), 0x10000); + assertEquals(learnInfo.getProtocolVersion(), 0x11000); assertEquals(learnInfo.getServerid(), 0); // We are simulating an established leader, so the epoch is 1 qp.setType(Leader.LEADERINFO); qp.setZxid(ZxidUtils.makeZxid(1, 0)); byte[] protoBytes = new byte[4]; - ByteBuffer.wrap(protoBytes).putInt(0x10000); + ByteBuffer.wrap(protoBytes).putInt(0x11000); qp.setData(protoBytes); oa.writeRecord(qp, null); @@ -828,7 +828,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro assertEquals(0, l.self.getCurrentEpoch()); /* we test a normal run. everything should work out well. */ - LearnerInfo li = new LearnerInfo(1, 0x10000, 0); + LearnerInfo li = new LearnerInfo(1, 0x11000, 0); byte[] liBytes = RequestRecord.fromRecord(li).readBytes(); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); oa.writeRecord(qp, null); @@ -836,7 +836,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro readPacketSkippingPing(ia, qp); assertEquals(Leader.LEADERINFO, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); + assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x11000); assertEquals(1, l.self.getAcceptedEpoch()); assertEquals(0, l.self.getCurrentEpoch()); @@ -868,7 +868,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro assertEquals(0, l.self.getAcceptedEpoch()); assertEquals(0, l.self.getCurrentEpoch()); - LearnerInfo li = new LearnerInfo(1, 0x10000, 0); + LearnerInfo li = new LearnerInfo(1, 0x11000, 0); byte[] liBytes = RequestRecord.fromRecord(li).readBytes(); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); oa.writeRecord(qp, null); @@ -876,7 +876,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro readPacketSkippingPing(ia, qp); assertEquals(Leader.LEADERINFO, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); + assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x11000); assertEquals(1, l.self.getAcceptedEpoch()); assertEquals(0, l.self.getCurrentEpoch()); @@ -958,14 +958,14 @@ public void converseWithObserver(InputArchive ia, OutputArchive oa, Observer o) assertEquals(qp.getZxid(), 0); LearnerInfo learnInfo = new LearnerInfo(); ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo); - assertEquals(learnInfo.getProtocolVersion(), 0x10000); + assertEquals(learnInfo.getProtocolVersion(), 0x11000); assertEquals(learnInfo.getServerid(), 0); // We are simulating an established leader, so the epoch is 1 qp.setType(Leader.LEADERINFO); qp.setZxid(ZxidUtils.makeZxid(1, 0)); byte[] protoBytes = new byte[4]; - ByteBuffer.wrap(protoBytes).putInt(0x10000); + ByteBuffer.wrap(protoBytes).putInt(0x11000); qp.setData(protoBytes); oa.writeRecord(qp, null); @@ -1070,7 +1070,7 @@ public void testLeaderBehind(@TempDir File testData) throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException { /* we test a normal run. everything should work out well. */ - LearnerInfo li = new LearnerInfo(1, 0x10000, 0); + LearnerInfo li = new LearnerInfo(1, 0x11000, 0); byte[] liBytes = RequestRecord.fromRecord(li).readBytes(); /* we are going to say we last acked epoch 20 */ QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, ZxidUtils.makeZxid(20, 0), liBytes, null); @@ -1078,7 +1078,7 @@ public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) thro readPacketSkippingPing(ia, qp); assertEquals(Leader.LEADERINFO, qp.getType()); assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid()); - assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); + assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x11000); qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); @@ -1107,14 +1107,14 @@ public void testAbandonBeforeACKEpoch(@TempDir File testData) throws Exception { testLeaderConversation(new LeaderConversation() { public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws IOException, InterruptedException { /* we test a normal run. everything should work out well. */ - LearnerInfo li = new LearnerInfo(1, 0x10000, 0); + LearnerInfo li = new LearnerInfo(1, 0x11000, 0); byte[] liBytes = RequestRecord.fromRecord(li).readBytes(); QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0, liBytes, null); oa.writeRecord(qp, null); readPacketSkippingPing(ia, qp); assertEquals(Leader.LEADERINFO, qp.getType()); assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid()); - assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x10000); + assertEquals(ByteBuffer.wrap(qp.getData()).getInt(), 0x11000); Thread.sleep(l.self.getInitLimit() * l.self.getTickTime() + 5000); // The leader didn't get a quorum of acks - make sure that leader's current epoch is not advanced diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java index d9d67c4023b..72cb114e7b8 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java @@ -43,6 +43,7 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.quorum.Leader; +import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.test.ClientBase.CountdownWatcher; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -562,8 +563,8 @@ private static TestableZooKeeper createTestableClient( private void verifyState(QuorumUtil qu, int index, Leader leader) { LOG.info("Verifying state"); assertTrue(qu.getPeer(index).peer.follower != null, "Not following"); - long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L); - long epochL = (leader.getEpoch() >> 32L); + long epochF = ZxidUtils.getEpochFromZxid(qu.getPeer(index).peer.getActiveServer().getZxid()); + long epochL = ZxidUtils.getEpochFromZxid(leader.getEpoch()); assertTrue(epochF == epochL, "Zxid: " + qu.getPeer(index).peer.getActiveServer().getZKDatabase().getDataTreeLastProcessedZxid() + "Current epoch: " + epochF); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java index 02b8ff6ef15..a9d50ce2f1d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java @@ -934,7 +934,7 @@ public void testInitialConfigHasPositiveVersion() throws Exception { String configStr = testServerHasConfig(zkArr[i], null, null); QuorumVerifier qv = qu.getPeer(i).peer.configFromString(configStr); long version = qv.getVersion(); - assertTrue(version == 0x100000000L); + assertTrue(version == 0x10000000000L); } }