Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZOOKEEPER-2789 Reassign ZXID for solving 32bit overflow problem #2164

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.zookeeper.server.util.ZxidUtils;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
import java.util.HashSet;
Expand Down Expand Up @@ -116,8 +117,8 @@ public JsonGenerator(LogIterator iter) {
else if ((m = newElectionP.matcher(e.getEntry())).find()) {
Iterator<Integer> iterator = servers.iterator();
long zxid = Long.valueOf(m.group(2));
int count = (int)zxid;// & 0xFFFFFFFFL;
int epoch = (int)Long.rotateRight(zxid, 32);// >> 32;
int count = ZxidUtils.getCounterFromZxid(zxid);
int epoch = ZxidUtils.getEpochFromZxid(zxid);

if (leader != 0 && epoch > curEpoch) {
JsonNode stateChange = add("stateChange", e.getTimestamp(), leader, "INIT");
Expand Down Expand Up @@ -149,8 +150,8 @@ else if ((m = receivedProposalP.matcher(e.getEntry())).find()) {
int dst = e.getNode();
long epoch2 = Long.valueOf(m.group(3));

int count = (int)zxid;// & 0xFFFFFFFFL;
int epoch = (int)Long.rotateRight(zxid, 32);// >> 32;
int count = ZxidUtils.getCounterFromZxid(zxid);
int epoch = ZxidUtils.getEpochFromZxid(zxid);

if (leader != 0 && epoch > curEpoch) {
JsonNode stateChange = add("stateChange", e.getTimestamp(), leader, "INIT");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.jute.Record;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.metrics.MetricsContext;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.server.ExitCode;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.Request;
Expand Down Expand Up @@ -200,7 +201,7 @@ protected void unregisterMetrics() {
private Request buildRequestToProcess(final TxnHeader hdr, final Record txn, final TxnDigest digest) {
final Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
request.setTxnDigest(digest);
if ((request.zxid & 0xffffffffL) != 0) {
if (ZxidUtils.getCounterFromZxid(request.zxid) != 0) {
pendingTxns.add(request);
}
return request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ public Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
this.self = self;
this.proposalStats = new BufferStats();

// A new Leader node has been elected, therefore the zxid bit length is synchronously increased.
ZxidUtils.setEpochHighPosition40();
self.setCurrentEpochPosition(40);

Set<InetSocketAddress> addresses;
if (self.getQuorumListenOnAllIPs()) {
addresses = self.getQuorumAddress().getWildcardAddresses();
Expand Down Expand Up @@ -662,7 +666,7 @@ void lead() throws IOException, InterruptedException {

newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);

if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
if (ZxidUtils.getCounterFromZxid(newLeaderProposal.packet.getZxid()) != 0) {
LOG.info("NEWLEADER proposal has Zxid of {}", Long.toHexString(newLeaderProposal.packet.getZxid()));
}

Expand Down Expand Up @@ -743,7 +747,7 @@ void lead() throws IOException, InterruptedException {
/**
* WARNING: do not use this for anything other than QA testing
* on a real cluster. Specifically to enable verification that quorum
* can handle the lower 32bit roll-over issue identified in
* can handle the lower 40bit roll-over issue identified in
* ZOOKEEPER-1277. Without this option it would take a very long
* time (on order of a month say) to see the 4 billion writes
* necessary to cause the roll-over to occur.
Expand All @@ -755,7 +759,7 @@ void lead() throws IOException, InterruptedException {
String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
if (initialZxid != null) {
long zxid = Long.parseLong(initialZxid);
zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
zk.setZxid(ZxidUtils.clearCounter(zk.getZxid()) | zxid);
}

if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
Expand Down Expand Up @@ -1058,7 +1062,7 @@ public synchronized void processAck(long sid, long zxid, SocketAddress followerA
LOG.trace("outstanding proposals all");
}

if ((zxid & 0xffffffffL) == 0) {
if (ZxidUtils.getCounterFromZxid(zxid) == 0) {
/*
* We no longer process NEWLEADER ack with this method. However,
* the learner sends an ack back to the leader after it gets
Expand Down Expand Up @@ -1291,11 +1295,11 @@ public Proposal propose(Request request) throws XidRolloverException {
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
}
/**
* Address the rollover issue. All lower 32bits set indicate a new leader
* Address the rollover issue. All lower 40bits set indicate a new leader
* election. Force a re-election instead. See ZOOKEEPER-1277
*/
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
if (ZxidUtils.getCounterFromZxid(request.zxid) == ZxidUtils.getCounterLowPosition()) {
String msg = "zxid lower 40 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,18 +491,28 @@ protected long registerWithLeader(int pktType) throws IOException {
/*
* Add sid to payload
*/
LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion());
LearnerInfo li = new LearnerInfo(self.getMyId(), 0x11000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());

writePacket(qp, true);
readPacket(qp);
final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
if (qp.getType() == Leader.LEADERINFO) {
// we are connected to a 1.0 server so accept the new epoch and read the next packet
leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();

if (leaderProtocolVersion >= 0x11000) {
/**
* The Leader is a new version with adjusted zxid bit length,
* thus Followers and Observers also adjust their zxid bit length accordingly.
*/
ZxidUtils.setEpochHighPosition40();
self.setCurrentEpochPosition(40);
newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
}
byte[] epochBytes = new byte[4];
final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
if (newEpoch > self.getAcceptedEpoch()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,15 +525,16 @@ public void run() {
long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// fake the message
learnerMaster.waitForEpochAck(this.getSid(), ss);
/**
* When the Leader switches to a new mode
* lower version Observers or Followers are not allowed to join the cluster, thus the zxid bit length has changed.
*/
if (this.getVersion() < 0x11000) {
LOG.error("sid:{} version too old", this.sid);
return;
} else {
byte[] ver = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
ByteBuffer.wrap(ver).putInt(0x11000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The patch doesn't alter anything in the protocol, but you still have to increase protocol version number in order to reject old followers. Odd, but seams reasonable.

QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
messageTracker.trackSent(Leader.LEADERINFO);
Expand Down Expand Up @@ -597,13 +598,8 @@ public void run() {
// the version of this quorumVerifier will be set by leader.lead() in case
// the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
// we got here, so the version was set
if (getVersion() < 0x10000) {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
} else {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
queuedPackets.add(newLeaderQP);
}
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
queuedPackets.add(newLeaderQP);
bufferedOutput.flush();

// Start thread that blast packets in the queue to learner
Expand Down Expand Up @@ -789,7 +785,7 @@ boolean syncFollower(long peerLastZxid, LearnerMaster learnerMaster) {
* zxid in our history. In this case, we will ignore TRUNC logic and
* always send DIFF if we have old enough history
*/
boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
boolean isPeerNewEpochZxid = ZxidUtils.getCounterFromZxid(peerLastZxid) == 0;
// Keep track of the latest zxid which already queued
long currentZxid = peerLastZxid;
boolean needSnap = true;
Expand Down Expand Up @@ -949,7 +945,7 @@ boolean syncFollower(long peerLastZxid, LearnerMaster learnerMaster) {
* @return last zxid of the queued proposal
*/
protected long queueCommittedProposals(Iterator<Proposal> itr, long peerLastZxid, Long maxZxid, Long lastCommittedZxid) {
boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
boolean isPeerNewEpochZxid = ZxidUtils.getCounterFromZxid(peerLastZxid) == 0;
long queuedZxid = peerLastZxid;
// as we look through proposals, this variable keeps track of previous
// proposal Id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,25 @@ private void loadDataBase() {

// load the epochs
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
/**
* It is only after joining the cluster that we know whether the current use is 32-bit or 40-bit.
* Here, a new file is created locally to store the current number of epoch bits. This file will be created locally after upgrading the version to ensure a smooth joining of the cluster upon startup.
*/
try {
currentEpochPosition = readLongFromFile(CURRENT_EPOCH_POSITION_FILENAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to store this information in a file?
Leader will set it automatically, learners will use the new version based on leader's version.

} catch (FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpochPosition = ZxidUtils.getEpochHighPosition();
LOG.info(
"{} not found! Creating with a reasonable default of {}. "
+ "This should only happen when you are upgrading your installation",
CURRENT_EPOCH_POSITION_FILENAME,
currentEpochPosition);
writeLongToFile(CURRENT_EPOCH_POSITION_FILENAME, currentEpochPosition);
}
ZxidUtils.setEpochHighPosition(currentEpochPosition);
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
Expand Down Expand Up @@ -2298,11 +2317,14 @@ private long readLongFromFile(String name) throws IOException {

private long acceptedEpoch = -1;
private long currentEpoch = -1;
private long currentEpochPosition = -1;

public static final String CURRENT_EPOCH_FILENAME = "currentEpoch";

public static final String ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";

public static final String CURRENT_EPOCH_POSITION_FILENAME = "currentEpochPosintion";

/**
* Write a long value to disk atomically. Either succeeds or an exception
* is thrown.
Expand Down Expand Up @@ -2341,6 +2363,11 @@ public void setCurrentEpoch(long e) throws IOException {

}

public void setCurrentEpochPosition(long e) throws IOException {
writeLongToFile(CURRENT_EPOCH_POSITION_FILENAME, e);
currentEpochPosition = e;
}

public void setAcceptedEpoch(long e) throws IOException {
writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
acceptedEpoch = e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,83 @@

package org.apache.zookeeper.server.util;

import java.util.concurrent.atomic.AtomicLong;

public class ZxidUtils {
// 40L
private static final long EPOCH_HIGH_POSITION32 = 32L;
private static final long EPOCH_HIGH_POSITION40 = 40L;

private static final long COUNTER_LOW_POSITION32 = 0xffffffffL;
private static final long CLEAR_EPOCH32 = 0x00000000ffffffffL;
private static final long CLEAR_COUNTER32 = 0xffffffff00000000L;

private static final long COUNTER_LOW_POSITION40 = 0xffffffffffL;
private static final long CLEAR_EPOCH40 = 0x000000ffffffffffL;
private static final long CLEAR_COUNTER40 = 0xffffff0000000000L;

private static AtomicLong EPOCH_HIGH_POSITION = new AtomicLong(EPOCH_HIGH_POSITION32);

private static AtomicLong COUNTER_LOW_POSITION = new AtomicLong(COUNTER_LOW_POSITION32);
private static AtomicLong CLEAR_EPOCH = new AtomicLong(CLEAR_EPOCH32);
private static AtomicLong CLEAR_COUNTER = new AtomicLong(CLEAR_COUNTER32);

static public long getEpochFromZxid(long zxid) {
return zxid >> EPOCH_HIGH_POSITION.get();
}

static public long getCounterFromZxid(long zxid) {
return zxid & COUNTER_LOW_POSITION.get();
}

public static long getEpochFromZxid(long zxid) {
return zxid >> 32L;
static public long makeZxid(long epoch, long counter) {
return (epoch << EPOCH_HIGH_POSITION.get()) | (counter & COUNTER_LOW_POSITION.get());
}
public static long getCounterFromZxid(long zxid) {
return zxid & 0xffffffffL;

static public long clearEpoch(long zxid) {
return zxid & CLEAR_EPOCH.get();
}
public static long makeZxid(long epoch, long counter) {
return (epoch << 32L) | (counter & 0xffffffffL);

static public long clearCounter(long zxid) {
return zxid & CLEAR_COUNTER.get();
}
public static String zxidToString(long zxid) {

static public String zxidToString(long zxid) {
return Long.toHexString(zxid);
}

public static long getEpochHighPosition() {
return EPOCH_HIGH_POSITION.get();
}

public static long getCounterLowPosition() {
return COUNTER_LOW_POSITION.get();
}

public static void setEpochHighPosition40() {
EPOCH_HIGH_POSITION.set(EPOCH_HIGH_POSITION40);

COUNTER_LOW_POSITION.set(COUNTER_LOW_POSITION40);
CLEAR_EPOCH.set(CLEAR_EPOCH40);
CLEAR_COUNTER.set(CLEAR_COUNTER40);
}

public static void setEpochHighPosition32() {
EPOCH_HIGH_POSITION.set(EPOCH_HIGH_POSITION32);

COUNTER_LOW_POSITION.set(COUNTER_LOW_POSITION32);
CLEAR_EPOCH.set(CLEAR_EPOCH32);
CLEAR_COUNTER.set(CLEAR_COUNTER32);
}

public static void setEpochHighPosition(long position) {
if (position == 32) {
setEpochHighPosition32();
} else if (position == 40) {
setEpochHighPosition40();
} else {
throw new IllegalArgumentException("Invalid epoch high position:" + position + ", should is 32 or 40.");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.apache.zookeeper.test.ClientTest;
Expand Down Expand Up @@ -210,7 +211,7 @@ private void shutdown(int idx) throws Exception {

/** Reset the next zxid to be near epoch end */
private void adjustEpochNearEnd() {
zksLeader.setZxid((zksLeader.getZxid() & 0xffffffff00000000L) | 0xfffffffcL);
zksLeader.setZxid(ZxidUtils.clearCounter(zksLeader.getZxid()) | 0xfffffffffcL);
}

@AfterEach
Expand Down
Loading