Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27984 NPE in MigrateReplicationQueueFromZkToTableProcedure reco… #5329

Merged
merged 2 commits into from
Jul 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp.ReplicationSyncUpToolInfo;
Expand Down Expand Up @@ -364,6 +365,9 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste

private RSGroupInfoManager rsGroupInfoManager;

private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier =
new ReplicationLogCleanerBarrier();

// manager of replication
private ReplicationPeerManager replicationPeerManager;

Expand Down Expand Up @@ -4106,6 +4110,11 @@ public ReplicationPeerManager getReplicationPeerManager() {
return replicationPeerManager;
}

@Override
public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
return replicationLogCleanerBarrier;
}

public HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>>
getReplicationLoad(ServerName[] serverNames) {
List<ReplicationPeerDescription> peerList = this.getReplicationPeerManager().listPeers(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
Expand Down Expand Up @@ -361,6 +362,12 @@ ReplicationPeerConfig getReplicationPeerConfig(String peerId)
*/
ReplicationPeerManager getReplicationPeerManager();

/**
* Returns the {@link ReplicationLogCleanerBarrier}. It will be used at multiple places so we put
* it in MasterServices directly.
*/
ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier();

/**
* Returns the {@link SyncReplicationReplayWALManager}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected ReplicationPeerConfig getNewPeerConfig() {
@Override
protected void releaseLatch(MasterProcedureEnv env) {
if (cleanerDisabled) {
env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
env.getMasterServices().getReplicationLogCleanerBarrier().enable();
}
if (peerConfig.isSyncReplication()) {
env.getReplicationPeerManager().releaseSyncReplicationPeerLock();
Expand All @@ -97,7 +97,7 @@ protected void releaseLatch(MasterProcedureEnv env) {
@Override
protected void prePeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException, ProcedureSuspendedException {
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn("LogCleaner is run at the same time when adding peer {}, sleep {} secs",
peerId, backoff / 1000));
Expand Down Expand Up @@ -142,7 +142,7 @@ protected void afterReplay(MasterProcedureEnv env) {
// when executing the procedure we will try to disable and acquire.
return;
}
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) {
throw new IllegalStateException("can not disable log cleaner, this should not happen");
}
cleanerDisabled = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private void shutdownExecutorService() {

private void disableReplicationLogCleaner(MasterProcedureEnv env)
throws ProcedureSuspendedException {
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) {
// it is not likely that we can reach here as we will schedule this procedure immediately
// after master restarting, where ReplicationLogCleaner should have not started its first run
// yet. But anyway, let's make the code more robust. And it is safe to wait a bit here since
Expand All @@ -130,7 +130,7 @@ private void disableReplicationLogCleaner(MasterProcedureEnv env)
}

private void enableReplicationLogCleaner(MasterProcedureEnv env) {
env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
env.getMasterServices().getReplicationLogCleanerBarrier().enable();
}

private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSuspendedException {
Expand Down Expand Up @@ -224,7 +224,7 @@ protected Flow executeFromState(MasterProcedureEnv env,
lockEntry = procLock.getLockEntry(getProcId());
} catch (IOException ioe) {
LOG.error("Error while acquiring execution lock for procedure {}"
+ " when trying to wake it up, aborting...", ioe);
+ " when trying to wake it up, aborting...", this, ioe);
env.getMasterServices().abort("Can not acquire procedure execution lock", e);
return;
}
Expand Down Expand Up @@ -304,7 +304,7 @@ protected void afterReplay(MasterProcedureEnv env) {
// when executing the procedure we will try to disable and acquire.
return;
}
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) {
throw new IllegalStateException("can not disable log cleaner, this should not happen");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
Expand Down Expand Up @@ -115,9 +114,6 @@ public class ReplicationPeerManager implements ConfigurationObserver {
// Only allow to add one sync replication peer concurrently
private final Semaphore syncReplicationPeerLock = new Semaphore(1);

private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier =
new ReplicationLogCleanerBarrier();

private final String clusterId;

private volatile Configuration conf;
Expand Down Expand Up @@ -725,10 +721,6 @@ public void releaseSyncReplicationPeerLock() {
syncReplicationPeerLock.release();
}

public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
return replicationLogCleanerBarrier;
}

@Override
public void onConfigurationChange(Configuration conf) {
this.conf = conf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
// queue for a given peer, that why we can use a String peerId as key instead of
// ReplicationQueueId.
private Map<ServerName, Map<String, Map<String, ReplicationGroupOffset>>> replicationOffsets;
private ReplicationLogCleanerBarrier barrier;
private ReplicationPeerManager rpm;
private Supplier<Set<ServerName>> getNotFullyDeadServers;

Expand All @@ -84,7 +85,7 @@ public void preClean() {
LOG.error("Error occurred while executing queueStorage.hasData()", e);
return;
}
canFilter = rpm.getReplicationLogCleanerBarrier().start();
canFilter = barrier.start();
if (canFilter) {
notFullyDeadServers = getNotFullyDeadServers.get();
peerIds = rpm.listPeers(null).stream().map(ReplicationPeerDescription::getPeerId)
Expand All @@ -98,7 +99,7 @@ public void preClean() {
allQueueData = rpm.getQueueStorage().listAllQueues();
} catch (ReplicationException e) {
LOG.error("Can not list all replication queues, give up cleaning", e);
rpm.getReplicationLogCleanerBarrier().stop();
barrier.stop();
canFilter = false;
notFullyDeadServers = null;
peerIds = null;
Expand All @@ -122,7 +123,7 @@ public void preClean() {
@Override
public void postClean() {
if (canFilter) {
rpm.getReplicationLogCleanerBarrier().stop();
barrier.stop();
canFilter = false;
// release memory
notFullyDeadServers = null;
Expand Down Expand Up @@ -244,6 +245,7 @@ public void init(Map<String, Object> params) {
Object master = params.get(HMaster.MASTER);
if (master != null && master instanceof MasterServices) {
MasterServices m = (MasterServices) master;
barrier = m.getReplicationLogCleanerBarrier();
rpm = m.getReplicationPeerManager();
getNotFullyDeadServers = () -> getNotFullyDeadServers(m);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
Expand Down Expand Up @@ -524,4 +525,9 @@ public boolean replicationPeerModificationSwitch(boolean on) throws IOException
public boolean isReplicationPeerModificationEnabled() {
return false;
}

@Override
public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,11 @@ public void beforeTest() throws Exception {

masterServices = mock(MasterServices.class);
when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());
when(masterServices.getReplicationLogCleanerBarrier())
.thenReturn(new ReplicationLogCleanerBarrier());
ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
when(masterServices.getReplicationPeerManager()).thenReturn(rpm);
when(rpm.getQueueStorage()).thenReturn(queueStorage);
when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
ServerManager sm = mock(ServerManager.class);
when(masterServices.getServerManager()).thenReturn(sm);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ public void testDisablePeerAndWaitStates() throws Exception {
EXTRA_REGION_SERVERS
.put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics);

ReplicationLogCleanerBarrier barrier = UTIL.getHBaseCluster().getMaster()
.getReplicationPeerManager().getReplicationLogCleanerBarrier();
ReplicationLogCleanerBarrier barrier =
UTIL.getHBaseCluster().getMaster().getReplicationLogCleanerBarrier();
assertTrue(barrier.start());

ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public class TestReplicationLogCleaner {
@Before
public void setUp() throws ReplicationException {
services = mock(MasterServices.class);
when(services.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
when(services.getReplicationPeerManager()).thenReturn(rpm);
when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class);
Expand Down Expand Up @@ -157,7 +157,7 @@ public void testNoConf() {

@Test
public void testCanNotFilter() {
assertTrue(services.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable());
assertTrue(services.getReplicationLogCleanerBarrier().disable());
List<FileStatus> files = Arrays.asList(new FileStatus());
assertSame(Collections.emptyList(), runCleaner(cleaner, files));
}
Expand Down