Skip to content

Commit

Permalink
fix:small observer can't join quorum, so need use getView()
Browse files Browse the repository at this point in the history
  • Loading branch information
甘梓辰 committed Nov 5, 2024
1 parent 9d52264 commit 0e68d1c
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,9 @@ private boolean startConnection(Socket sock, Long sid) throws IOException {
return false;
}

// fix: small observer can't join quorum, so need use getView()
// authenticate learner
QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
QuorumPeer.QuorumServer qps = self.getView().get(sid);
if (qps != null) {
// TODO - investigate why reconfig makes qps null.
authLearner.authenticate(sock, qps.hostname);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper.server.quorum.auth;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.util.HashMap;
import java.util.Map;

public class QuorumAuthObserverTest extends QuorumAuthTestBase {

static {
String jaasEntries = "QuorumServer {\n"
+ " org.apache.zookeeper.server.auth.DigestLoginModule required\n"
+ " user_test=\"mypassword\";\n"
+ "};\n"
+ "QuorumLearner {\n"
+ " org.apache.zookeeper.server.auth.DigestLoginModule required\n"
+ " username=\"test\"\n"
+ " password=\"mypassword\";\n"
+ "};\n";
setupJaasConfig(jaasEntries);
}

@AfterEach
@Override
public void tearDown() throws Exception {
shutdownAll();
super.tearDown();
}

@AfterAll
public static void cleanup() {
cleanupJaasConfig();
}

/**
* Test to myid small observer join quorum.
* peer0 myid:11 participant
* peer1 myid:21 participant
* peer2 myid:1 observer
*/
@Test
@Timeout(value = 30)
public void testSmallObserverJoinSASLQuorum() throws Exception {
Map<String, String> authConfigs = new HashMap<>();
authConfigs.put(QuorumAuth.QUORUM_SASL_AUTH_ENABLED, "true");
authConfigs.put(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED, "true");
authConfigs.put(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED, "true");

// create quorum
StringBuilder connectStringBuilder = new StringBuilder();
int[] myidList = {11, 21, 1};
String[] roleList = {"participant", "participant", "observer"};
int[] clientPorts = startQuorum(3, connectStringBuilder, authConfigs, 3, false, myidList, roleList);

// small observer can't join quorum
String connectStr = "127.0.0.1:" + clientPorts[2];
CountdownWatcher watcher = new CountdownWatcher();
ZooKeeper zk = new ZooKeeper(connectStr, ClientBase.CONNECTION_TIMEOUT, watcher);
watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT);

zk.create("/foo", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,28 @@ protected String startQuorum(
}

protected int[] startQuorum(final int serverCount, StringBuilder connectStr, Map<String, String> authConfigs,
int authServerCount, boolean multiAddress) throws IOException {
int authServerCount, boolean multiAddress) throws IOException {
int[] defalutMyidList = new int[serverCount];
String[] defalutRoleList = new String[serverCount];
for (int i = 0; i < serverCount; i++) {
defalutMyidList[i] = i;
defalutRoleList[i] = "participant";
}
return startQuorum(serverCount, connectStr, authConfigs, authServerCount, multiAddress, defalutMyidList, defalutRoleList);
}

protected int[] startQuorum(final int serverCount, StringBuilder connectStr, Map<String, String> authConfigs,
int authServerCount, boolean multiAddress, int[] myidList, String[] roleList) throws IOException {
final int[] clientPorts = new int[serverCount];
StringBuilder sb = new StringBuilder();
for (int i = 0; i < serverCount; i++) {
clientPorts[i] = PortAssignment.unique();
String server = String.format("server.%d=localhost:%d:%d", i, PortAssignment.unique(), PortAssignment.unique());
String server = String.format("server.%d=localhost:%d:%d", myidList[i], PortAssignment.unique(), PortAssignment.unique());
if (multiAddress) {
server = server + String.format("|localhost:%d:%d", PortAssignment.unique(), PortAssignment.unique());
}
sb.append(server + ":participant\n");

sb.append(server + ":" + roleList[i] + "\n");
connectStr.append("127.0.0.1:" + clientPorts[i]);
if (i < serverCount - 1) {
connectStr.append(",");
Expand All @@ -122,11 +134,11 @@ protected int[] startQuorum(final int serverCount, StringBuilder connectStr, Map
// servers with authentication interfaces configured
int i = 0;
for (; i < authServerCount; i++) {
startServer(authConfigs, clientPorts, quorumCfg, i);
startServer(authConfigs, clientPorts, quorumCfg, i, myidList);
}
// servers without any authentication configured
for (int j = 0; j < serverCount - authServerCount; j++, i++) {
MainThread mthread = new MainThread(i, clientPorts[i], quorumCfg);
MainThread mthread = new MainThread(myidList[i], clientPorts[i], quorumCfg);
mt.add(mthread);
mthread.start();
}
Expand All @@ -137,8 +149,9 @@ private void startServer(
Map<String, String> authConfigs,
final int[] clientPorts,
String quorumCfg,
int i) throws IOException {
MainThread mthread = new MainThread(i, clientPorts[i], quorumCfg, authConfigs);
int i,
int[] myidList) throws IOException {
MainThread mthread = new MainThread(myidList[i], clientPorts[i], quorumCfg, authConfigs);
mt.add(mthread);
mthread.start();
}
Expand All @@ -149,6 +162,16 @@ protected void startServer(MainThread restartPeer, Map<String, String> authConfi
mthread.start();
}

protected void startServer(
Map<String, String> authConfigs,
final int clientPort,
String quorumCfg,
int myid) throws IOException {
MainThread mthread = new MainThread(myid, clientPort, quorumCfg, authConfigs);
mt.add(mthread);
mthread.start();
}

void shutdownAll() {
for (int i = 0; i < mt.size(); i++) {
shutdown(i);
Expand Down

0 comments on commit 0e68d1c

Please sign in to comment.