Skip to content

Commit

Permalink
Merge pull request #1942 from cnauroth/ZOOKEEPER-4460
Browse files Browse the repository at this point in the history
ZOOKEEPER-4460: QuorumPeer overrides Thread.getId with different sema…

Signed-off-by: Enrico Olivelli <eolivelli@apache.org>
(cherry picked from commit cedf093)
(cherry picked from commit 941151c)
  • Loading branch information
cnauroth committed Nov 3, 2022
1 parent 32225e3 commit e002e08
Show file tree
Hide file tree
Showing 24 changed files with 99 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs)
QuorumPeer.ZabState zabState = peer.getZabState();
QuorumVerifier qv = peer.getQuorumVerifier();

QuorumPeer.QuorumServer voter = qv.getVotingMembers().get(peer.getId());
QuorumPeer.QuorumServer voter = qv.getVotingMembers().get(peer.getMyId());
boolean voting = (
voter != null
&& voter.addr.equals(peer.getQuorumAddress())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void processRequest(Request request) {
QuorumPeer self = leader.self;
if (self != null) {
request.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
leader.processAck(self.getId(), request.zxid, null);
leader.processAck(self.getMyId(), request.zxid, null);
} else {
LOG.error("Null QuorumPeer");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public void run() {
QuorumVerifier curQV = self.getQuorumVerifier();
if (rqv.getVersion() > curQV.getVersion()) {
LOG.info("{} Received version: {} my version: {}",
self.getId(),
self.getMyId(),
Long.toHexString(rqv.getVersion()),
Long.toHexString(self.getQuorumVerifier().getVersion()));
if (self.getPeerState() == ServerState.LOOKING) {
Expand Down Expand Up @@ -355,7 +355,7 @@ public void run() {
sendqueue.offer(notmsg);
} else {
// Receive new message
LOG.debug("Receive new notification message. My id = {}", self.getId());
LOG.debug("Receive new notification message. My id = {}", self.getMyId());

// State of peer that sent this message
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
Expand Down Expand Up @@ -445,7 +445,7 @@ public void run() {

LOG.debug(
"Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
self.getId(),
self.getMyId(),
response.sid,
Long.toHexString(current.getZxid()),
current.getId(),
Expand Down Expand Up @@ -534,12 +534,12 @@ void process(ToSend m) {

this.ws = new WorkerSender(manager);

this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getMyId() + "]");
this.wsThread.setDaemon(true);

this.wr = new WorkerReceiver(manager);

this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getMyId() + "]");
this.wrThread.setDaemon(true);
}

Expand Down Expand Up @@ -662,7 +662,7 @@ private void leaveInstance(Vote v) {
"About to leave FLE instance: leader={}, zxid=0x{}, my id={}, my state={}",
v.getId(),
Long.toHexString(v.getZxid()),
self.getId(),
self.getMyId(),
self.getPeerState());
recvqueue.clear();
}
Expand Down Expand Up @@ -707,7 +707,7 @@ private void sendNotifications() {
Long.toHexString(proposedZxid),
Long.toHexString(logicalclock.get()),
sid,
self.getId(),
self.getMyId(),
Long.toHexString(proposedEpoch));

sendqueue.offer(notmsg);
Expand Down Expand Up @@ -799,7 +799,7 @@ protected boolean checkLeader(Map<Long, Vote> votes, long leader, long electionE
* from leader stating that it is leading, then predicate is false.
*/

if (leader != self.getId()) {
if (leader != self.getMyId()) {
if (votes.get(leader) == null) {
predicate = false;
} else if (votes.get(leader).getState() != ServerState.LEADING) {
Expand Down Expand Up @@ -838,10 +838,10 @@ public synchronized Vote getVote() {
*/
private ServerState learningState() {
if (self.getLearnerType() == LearnerType.PARTICIPANT) {
LOG.debug("I am a participant: {}", self.getId());
LOG.debug("I am a participant: {}", self.getMyId());
return ServerState.FOLLOWING;
} else {
LOG.debug("I am an observer: {}", self.getId());
LOG.debug("I am an observer: {}", self.getMyId());
return ServerState.OBSERVING;
}
}
Expand All @@ -852,8 +852,8 @@ private ServerState learningState() {
* @return long
*/
private long getInitId() {
if (self.getQuorumVerifier().getVotingMembers().containsKey(self.getId())) {
return self.getId();
if (self.getQuorumVerifier().getVotingMembers().containsKey(self.getMyId())) {
return self.getMyId();
} else {
return Long.MIN_VALUE;
}
Expand Down Expand Up @@ -896,7 +896,7 @@ private long getPeerEpoch() {
* the leadingVoteSet if it becomes the leader.
*/
private void setPeerState(long proposedLeader, SyncedLearnerTracker voteSet) {
ServerState ss = (proposedLeader == self.getId()) ? ServerState.LEADING : learningState();
ServerState ss = (proposedLeader == self.getMyId()) ? ServerState.LEADING : learningState();
self.setPeerState(ss);
if (ss == ServerState.LEADING) {
leadingVoteSet = voteSet;
Expand Down Expand Up @@ -944,7 +944,7 @@ public Vote lookForLeader() throws InterruptedException {

LOG.info(
"New election. My id = {}, proposed zxid=0x{}",
self.getId(),
self.getMyId(),
Long.toHexString(proposedZxid));
sendNotifications();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ boolean isLearnerSynced(LearnerHandler peer) {
*/
public boolean isQuorumSynced(QuorumVerifier qv) {
HashSet<Long> ids = new HashSet<Long>();
if (qv.getVotingMembers().containsKey(self.getId())) {
ids.add(self.getId());
if (qv.getVotingMembers().containsKey(self.getMyId())) {
ids.add(self.getMyId());
}
synchronized (forwardingFollowers) {
for (LearnerHandler learnerHandler : forwardingFollowers) {
Expand Down Expand Up @@ -599,7 +599,7 @@ void lead() throws IOException, InterruptedException {
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();

long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
long epoch = getEpochToPropose(self.getMyId(), self.getAcceptedEpoch());

zk.setZxid(ZxidUtils.makeZxid(epoch, 0));

Expand Down Expand Up @@ -654,13 +654,13 @@ void lead() throws IOException, InterruptedException {
// us. We do this by waiting for the NEWLEADER packet to get
// acknowledged

waitForEpochAck(self.getId(), leaderStateSummary);
waitForEpochAck(self.getMyId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
self.setLeaderAddressAndId(self.getQuorumAddress(), self.getId());
self.setLeaderAddressAndId(self.getQuorumAddress(), self.getMyId());
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);

try {
waitForNewLeaderAck(self.getId(), zk.getZxid());
waitForNewLeaderAck(self.getMyId(), zk.getZxid());
} catch (InterruptedException e) {
shutdown("Waiting for a quorum of followers, only synced with sids: [ "
+ newLeaderProposal.ackSetsToString()
Expand Down Expand Up @@ -742,7 +742,7 @@ void lead() throws IOException, InterruptedException {
syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}

syncedAckSet.addAck(self.getId());
syncedAckSet.addAck(self.getMyId());

for (LearnerHandler f : getLearners()) {
if (f.synced()) {
Expand Down Expand Up @@ -846,15 +846,15 @@ private long getDesignatedLeader(Proposal reconfigProposal, long zxid) {

//check if I'm in the new configuration with the same quorum address -
// if so, I'll remain the leader
if (newQVAcksetPair.getQuorumVerifier().getVotingMembers().containsKey(self.getId())
&& newQVAcksetPair.getQuorumVerifier().getVotingMembers().get(self.getId()).addr.equals(self.getQuorumAddress())) {
return self.getId();
if (newQVAcksetPair.getQuorumVerifier().getVotingMembers().containsKey(self.getMyId())
&& newQVAcksetPair.getQuorumVerifier().getVotingMembers().get(self.getMyId()).addr.equals(self.getQuorumAddress())) {
return self.getMyId();
}
// start with an initial set of candidates that are voters from new config that
// acknowledged the reconfig op (there must be a quorum). Choose one of them as
// current leader candidate
HashSet<Long> candidates = new HashSet<Long>(newQVAcksetPair.getAckset());
candidates.remove(self.getId()); // if we're here, I shouldn't be the leader
candidates.remove(self.getMyId()); // if we're here, I shouldn't be the leader
long curCandidate = candidates.iterator().next();

//go over outstanding ops in order, and try to find a candidate that acked the most ops.
Expand Down Expand Up @@ -936,7 +936,7 @@ public synchronized boolean tryToCommit(Proposal p, long zxid, SocketAddress fol

self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);

if (designatedLeader != self.getId()) {
if (designatedLeader != self.getMyId()) {
LOG.info(String.format("Committing a reconfiguration (reconfigEnabled=%s); this leader is not the designated "
+ "leader anymore, setting allowedToCommit=false", self.isReconfigEnabled()));
allowedToCommit = false;
Expand Down Expand Up @@ -1400,13 +1400,13 @@ public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws Interrupt
connectingFollowers.add(sid);
}
QuorumVerifier verifier = self.getQuorumVerifier();
if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) {
if (connectingFollowers.contains(self.getMyId()) && verifier.containsQuorum(connectingFollowers)) {
waitingForNewEpoch = false;
self.setAcceptedEpoch(epoch);
connectingFollowers.notifyAll();
} else {
long start = Time.currentElapsedTime();
if (sid == self.getId()) {
if (sid == self.getMyId()) {
timeStartWaitForEpoch = start;
}
long cur = start;
Expand Down Expand Up @@ -1452,7 +1452,7 @@ public void waitForEpochAck(long id, StateSummary ss) throws IOException, Interr
}
}
QuorumVerifier verifier = self.getQuorumVerifier();
if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) {
if (electingFollowers.contains(self.getMyId()) && verifier.containsQuorum(electingFollowers)) {
electionFinished = true;
electingFollowers.notifyAll();
} else {
Expand Down Expand Up @@ -1509,7 +1509,7 @@ private synchronized void startZkServer() {
Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid());

self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
if (designatedLeader != self.getId()) {
if (designatedLeader != self.getMyId()) {
LOG.warn("This leader is not the designated leader, it will be initialized with allowedToCommit = false");
allowedToCommit = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void createSessionTracker() {
this,
getZKDatabase().getSessionWithTimeOuts(),
tickTime,
self.getId(),
self.getMyId(),
self.areLocalSessionsEnabled(),
getZooKeeperServerListener());
}
Expand Down Expand Up @@ -291,7 +291,7 @@ public String getState() {
*/
@Override
public long getServerId() {
return self.getId();
return self.getMyId();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ protected long registerWithLeader(int pktType) throws IOException {
/*
* Add sid to payload
*/
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
LearnerInfo li = new LearnerInfo(self.getMyId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected Map<Long, Integer> getTouchSnapshot() {
*/
@Override
public long getServerId() {
return self.getId();
return self.getMyId();
}

@Override
Expand All @@ -80,7 +80,7 @@ public void createSessionTracker() {
this,
getZKDatabase().getSessionWithTimeOuts(),
this.tickTime,
self.getId(),
self.getMyId(),
self.areLocalSessionsEnabled(),
getZooKeeperServerListener());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public LocalPeerBean(QuorumPeer peer) {
}

public String getName() {
return "replica." + peer.getId();
return "replica." + peer.getMyId();
}

public boolean isHidden() {
Expand Down Expand Up @@ -119,12 +119,12 @@ public String getQuorumSystemInfo() {

@Override
public boolean isPartOfEnsemble() {
return peer.getView().containsKey(peer.getId());
return peer.getView().containsKey(peer.getMyId());
}

@Override
public boolean isLeader() {
return peer.isLeader(peer.getId());
return peer.isLeader(peer.getMyId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class QuorumBean implements QuorumMXBean, ZKMBeanInfo {

public QuorumBean(QuorumPeer peer) {
this.peer = peer;
name = "ReplicatedServer_id" + peer.getId();
name = "ReplicatedServer_id" + peer.getMyId();
}

@Override
Expand Down
Loading

0 comments on commit e002e08

Please sign in to comment.