Skip to content

Commit

Permalink
HDDS-11501. Improve logging in XceiverServerRatis (apache#7252)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghuazhu authored and sarvekshayr committed Oct 7, 2024
1 parent 20b067b commit ad982e9
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
Expand Down Expand Up @@ -1162,8 +1163,8 @@ public void evictStateMachineCache() {
}

@Override
public void notifyFollowerSlowness(RoleInfoProto roleInfoProto) {
ratisServer.handleNodeSlowness(gid, roleInfoProto);
public void notifyFollowerSlowness(RoleInfoProto roleInfoProto, RaftPeer follower) {
ratisServer.handleFollowerSlowness(gid, roleInfoProto, follower);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TraditionalBinaryPrefix;
Expand Down Expand Up @@ -161,19 +162,18 @@ private static long nextCallId() {
private int clientPort;
private int dataStreamPort;
private final RaftServer server;
private final String name;
private final List<ThreadPoolExecutor> chunkExecutors;
private final ContainerDispatcher dispatcher;
private final ContainerController containerController;
private final ClientId clientId = ClientId.randomId();
private final StateContext context;
private final long nodeFailureTimeoutMs;
private boolean isStarted = false;
private final DatanodeDetails datanodeDetails;
private final ConfigurationSource conf;
// TODO: Remove the gids set when Ratis supports an api to query active
// pipelines
private final ConcurrentMap<RaftGroupId, ActivePipelineContext> activePipelines = new ConcurrentHashMap<>();
private final RaftPeerId raftPeerId;
// Timeout used while calling submitRequest directly.
private final long requestTimeout;
private final boolean shouldDeleteRatisLogDirectory;
Expand All @@ -197,14 +197,14 @@ private XceiverServerRatis(HddsDatanodeService hddsDatanodeService, DatanodeDeta
this.context = context;
this.dispatcher = dispatcher;
this.containerController = containerController;
this.raftPeerId = RatisHelper.toRaftPeerId(dd);
String threadNamePrefix = datanodeDetails.threadNamePrefix();
chunkExecutors = createChunkExecutors(conf, threadNamePrefix);
nodeFailureTimeoutMs = ratisServerConfig.getFollowerSlownessTimeout();
shouldDeleteRatisLogDirectory =
ratisServerConfig.shouldDeleteRatisLogDirectory();

RaftProperties serverProperties = newRaftProperties();
final RaftPeerId raftPeerId = RatisHelper.toRaftPeerId(dd);
this.name = getClass().getSimpleName() + "(" + raftPeerId + ")";
this.server =
RaftServer.newBuilder().setServerId(raftPeerId)
.setProperties(serverProperties)
Expand Down Expand Up @@ -474,7 +474,7 @@ private void setStateMachineDataConfigurations(RaftProperties properties) {

// NOTE : the default value for the retry count in ratis is -1,
// which means retry indefinitely.
int syncTimeoutRetryDefault = (int) nodeFailureTimeoutMs /
final int syncTimeoutRetryDefault = (int) ratisServerConfig.getFollowerSlownessTimeout() /
dataSyncTimeout.toIntExact(TimeUnit.MILLISECONDS);
int numSyncRetries = conf.getInt(
OzoneConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES,
Expand Down Expand Up @@ -558,7 +558,7 @@ private static Parameters createTlsParameters(SecurityConfig conf,
@Override
public void start() throws IOException {
if (!isStarted) {
LOG.info("Starting {} {}", getClass().getSimpleName(), server.getId());
LOG.info("Starting {}", name);
for (ThreadPoolExecutor executor : chunkExecutors) {
executor.prestartAllCoreThreads();
}
Expand All @@ -581,19 +581,19 @@ public void start() throws IOException {
}
}

private int getRealPort(InetSocketAddress address, Port.Name name) {
private int getRealPort(InetSocketAddress address, Port.Name portName) {
int realPort = address.getPort();
datanodeDetails.setPort(DatanodeDetails.newPort(name, realPort));
LOG.info("{} {} is started using port {} for {}",
getClass().getSimpleName(), server.getId(), realPort, name);
final Port port = DatanodeDetails.newPort(portName, realPort);
datanodeDetails.setPort(port);
LOG.info("{} is started using port {}", name, port);
return realPort;
}

@Override
public void stop() {
if (isStarted) {
try {
LOG.info("Stopping {} {}", getClass().getSimpleName(), server.getId());
LOG.info("Closing {}", name);
// shutdown server before the executors as while shutting down,
// some of the tasks would be executed using the executors.
server.close();
Expand All @@ -602,7 +602,7 @@ public void stop() {
}
isStarted = false;
} catch (IOException e) {
LOG.error("XceiverServerRatis Could not be stopped gracefully.", e);
LOG.error("Failed to close {}.", name, e);
}
}
}
Expand Down Expand Up @@ -706,45 +706,40 @@ private GroupInfoRequest createGroupInfoRequest(
nextCallId());
}

private void handlePipelineFailure(RaftGroupId groupId,
RoleInfoProto roleInfoProto) {
String msg;
UUID datanode = RatisHelper.toDatanodeId(roleInfoProto.getSelf());
RaftPeerId id = RaftPeerId.valueOf(roleInfoProto.getSelf().getId());
private void handlePipelineFailure(RaftGroupId groupId, RoleInfoProto roleInfoProto, String reason) {
final RaftPeerId raftPeerId = RaftPeerId.valueOf(roleInfoProto.getSelf().getId());
Preconditions.assertEquals(getServer().getId(), raftPeerId, "raftPeerId");
final StringBuilder b = new StringBuilder()
.append(name).append(" with datanodeId ").append(RatisHelper.toDatanodeId(raftPeerId))
.append("handlePipelineFailure ").append(" for ").append(reason)
.append(": ").append(roleInfoProto.getRole())
.append(" elapsed time=").append(roleInfoProto.getRoleElapsedTimeMs()).append("ms");

switch (roleInfoProto.getRole()) {
case CANDIDATE:
msg = datanode + " is in candidate state for " +
roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() + "ms";
final long lastLeaderElapsedTime = roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs();
b.append(", lastLeaderElapsedTime=").append(lastLeaderElapsedTime).append("ms");
break;
case FOLLOWER:
msg = datanode + " closes pipeline when installSnapshot from leader " +
"because leader snapshot doesn't contain any data to replay, " +
"all the log entries prior to the snapshot might have been purged." +
"So follower should not try to install snapshot from leader but" +
"can close the pipeline here. It's in follower state for " +
roleInfoProto.getRoleElapsedTimeMs() + "ms";
b.append(", outstandingOp=").append(roleInfoProto.getFollowerInfo().getOutstandingOp());
break;
case LEADER:
StringBuilder sb = new StringBuilder();
sb.append(datanode).append(" has not seen follower/s");
for (RaftProtos.ServerRpcProto follower : roleInfoProto.getLeaderInfo()
.getFollowerInfoList()) {
if (follower.getLastRpcElapsedTimeMs() > nodeFailureTimeoutMs) {
sb.append(" ").append(RatisHelper.toDatanodeId(follower.getId()))
.append(" for ").append(follower.getLastRpcElapsedTimeMs())
.append("ms");
}
final long followerSlownessTimeoutMs = ratisServerConfig.getFollowerSlownessTimeout();
for (RaftProtos.ServerRpcProto follower : roleInfoProto.getLeaderInfo().getFollowerInfoList()) {
final long lastRpcElapsedTimeMs = follower.getLastRpcElapsedTimeMs();
final boolean slow = lastRpcElapsedTimeMs > followerSlownessTimeoutMs;
final RaftPeerId followerId = RaftPeerId.valueOf(follower.getId().getId());
b.append("\n Follower ").append(followerId)
.append(" with datanodeId ").append(RatisHelper.toDatanodeId(followerId))
.append(" is ").append(slow ? "slow" : " responding")
.append(" with lastRpcElapsedTime=").append(lastRpcElapsedTimeMs).append("ms");
}
msg = sb.toString();
break;
default:
LOG.error("unknown state: {}", roleInfoProto.getRole());
throw new IllegalStateException("node" + id + " is in illegal role "
+ roleInfoProto.getRole());
throw new IllegalStateException("Unexpected role " + roleInfoProto.getRole());
}

triggerPipelineClose(groupId, msg,
ClosePipelineInfo.Reason.PIPELINE_FAILED);
triggerPipelineClose(groupId, b.toString(), ClosePipelineInfo.Reason.PIPELINE_FAILED);
}

private void triggerPipelineClose(RaftGroupId groupId, String detail,
Expand Down Expand Up @@ -869,12 +864,12 @@ public void removeGroup(HddsProtos.PipelineID pipelineId)
processReply(reply);
}

void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
handlePipelineFailure(groupId, roleInfoProto);
void handleFollowerSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto, RaftPeer follower) {
handlePipelineFailure(groupId, roleInfoProto, "slow follower " + follower.getId());
}

void handleNoLeader(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
handlePipelineFailure(groupId, roleInfoProto);
handlePipelineFailure(groupId, roleInfoProto, "no leader");
}

void handleApplyTransactionFailure(RaftGroupId groupId,
Expand All @@ -901,10 +896,9 @@ void handleApplyTransactionFailure(RaftGroupId groupId,
void handleInstallSnapshotFromLeader(RaftGroupId groupId,
RoleInfoProto roleInfoProto,
TermIndex firstTermIndexInLog) {
LOG.warn("Install snapshot notification received from Leader with " +
"termIndex: {}, terminating pipeline: {}",
LOG.warn("handleInstallSnapshotFromLeader for firstTermIndexInLog={}, terminating pipeline: {}",
firstTermIndexInLog, groupId);
handlePipelineFailure(groupId, roleInfoProto);
handlePipelineFailure(groupId, roleInfoProto, "install snapshot notification");
}

/**
Expand Down Expand Up @@ -950,7 +944,7 @@ void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId,
LOG.info("Leader change notification received for group: {} with new " +
"leaderId: {}", groupMemberId.getGroupId(), raftPeerId1);
// Save the reported leader to be sent with the report to SCM
boolean leaderForGroup = this.raftPeerId.equals(raftPeerId1);
final boolean leaderForGroup = server.getId().equals(raftPeerId1);
activePipelines.compute(groupMemberId.getGroupId(),
(key, value) -> value == null ? new ActivePipelineContext(leaderForGroup, false) :
new ActivePipelineContext(leaderForGroup, value.isPendingClose()));
Expand Down

0 comments on commit ad982e9

Please sign in to comment.