Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11501. Improve logging in XceiverServerRatis #7252

Merged
merged 2 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
Expand Down Expand Up @@ -1162,8 +1163,8 @@ public void evictStateMachineCache() {
}

@Override
public void notifyFollowerSlowness(RoleInfoProto roleInfoProto) {
ratisServer.handleNodeSlowness(gid, roleInfoProto);
public void notifyFollowerSlowness(RoleInfoProto roleInfoProto, RaftPeer follower) {
ratisServer.handleFollowerSlowness(gid, roleInfoProto, follower);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TraditionalBinaryPrefix;
Expand Down Expand Up @@ -161,19 +162,18 @@ private static long nextCallId() {
private int clientPort;
private int dataStreamPort;
private final RaftServer server;
private final String name;
private final List<ThreadPoolExecutor> chunkExecutors;
private final ContainerDispatcher dispatcher;
private final ContainerController containerController;
private final ClientId clientId = ClientId.randomId();
private final StateContext context;
private final long nodeFailureTimeoutMs;
private boolean isStarted = false;
private final DatanodeDetails datanodeDetails;
private final ConfigurationSource conf;
// TODO: Remove the gids set when Ratis supports an api to query active
// pipelines
private final ConcurrentMap<RaftGroupId, ActivePipelineContext> activePipelines = new ConcurrentHashMap<>();
private final RaftPeerId raftPeerId;
// Timeout used while calling submitRequest directly.
private final long requestTimeout;
private final boolean shouldDeleteRatisLogDirectory;
Expand All @@ -197,14 +197,14 @@ private XceiverServerRatis(HddsDatanodeService hddsDatanodeService, DatanodeDeta
this.context = context;
this.dispatcher = dispatcher;
this.containerController = containerController;
this.raftPeerId = RatisHelper.toRaftPeerId(dd);
String threadNamePrefix = datanodeDetails.threadNamePrefix();
chunkExecutors = createChunkExecutors(conf, threadNamePrefix);
nodeFailureTimeoutMs = ratisServerConfig.getFollowerSlownessTimeout();
shouldDeleteRatisLogDirectory =
ratisServerConfig.shouldDeleteRatisLogDirectory();

RaftProperties serverProperties = newRaftProperties();
final RaftPeerId raftPeerId = RatisHelper.toRaftPeerId(dd);
this.name = getClass().getSimpleName() + "(" + raftPeerId + ")";
this.server =
RaftServer.newBuilder().setServerId(raftPeerId)
.setProperties(serverProperties)
Expand Down Expand Up @@ -474,7 +474,7 @@ private void setStateMachineDataConfigurations(RaftProperties properties) {

// NOTE : the default value for the retry count in ratis is -1,
// which means retry indefinitely.
int syncTimeoutRetryDefault = (int) nodeFailureTimeoutMs /
final int syncTimeoutRetryDefault = (int) ratisServerConfig.getFollowerSlownessTimeout() /
dataSyncTimeout.toIntExact(TimeUnit.MILLISECONDS);
int numSyncRetries = conf.getInt(
OzoneConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES,
Expand Down Expand Up @@ -558,7 +558,7 @@ private static Parameters createTlsParameters(SecurityConfig conf,
@Override
public void start() throws IOException {
if (!isStarted) {
LOG.info("Starting {} {}", getClass().getSimpleName(), server.getId());
LOG.info("Starting {}", name);
for (ThreadPoolExecutor executor : chunkExecutors) {
executor.prestartAllCoreThreads();
}
Expand All @@ -581,19 +581,19 @@ public void start() throws IOException {
}
}

private int getRealPort(InetSocketAddress address, Port.Name name) {
private int getRealPort(InetSocketAddress address, Port.Name portName) {
int realPort = address.getPort();
datanodeDetails.setPort(DatanodeDetails.newPort(name, realPort));
LOG.info("{} {} is started using port {} for {}",
getClass().getSimpleName(), server.getId(), realPort, name);
final Port port = DatanodeDetails.newPort(portName, realPort);
datanodeDetails.setPort(port);
LOG.info("{} is started using port {}", name, port);
return realPort;
}

@Override
public void stop() {
if (isStarted) {
try {
LOG.info("Stopping {} {}", getClass().getSimpleName(), server.getId());
LOG.info("Closing {}", name);
// shutdown server before the executors as while shutting down,
// some of the tasks would be executed using the executors.
server.close();
Expand All @@ -602,7 +602,7 @@ public void stop() {
}
isStarted = false;
} catch (IOException e) {
LOG.error("XceiverServerRatis Could not be stopped gracefully.", e);
LOG.error("Failed to close {}.", name, e);
}
}
}
Expand Down Expand Up @@ -706,45 +706,40 @@ private GroupInfoRequest createGroupInfoRequest(
nextCallId());
}

private void handlePipelineFailure(RaftGroupId groupId,
RoleInfoProto roleInfoProto) {
String msg;
UUID datanode = RatisHelper.toDatanodeId(roleInfoProto.getSelf());
RaftPeerId id = RaftPeerId.valueOf(roleInfoProto.getSelf().getId());
private void handlePipelineFailure(RaftGroupId groupId, RoleInfoProto roleInfoProto, String reason) {
final RaftPeerId raftPeerId = RaftPeerId.valueOf(roleInfoProto.getSelf().getId());
Preconditions.assertEquals(getServer().getId(), raftPeerId, "raftPeerId");
final StringBuilder b = new StringBuilder()
.append(name).append(" with datanodeId ").append(RatisHelper.toDatanodeId(raftPeerId))
.append("handlePipelineFailure ").append(" for ").append(reason)
.append(": ").append(roleInfoProto.getRole())
.append(" elapsed time=").append(roleInfoProto.getRoleElapsedTimeMs()).append("ms");

switch (roleInfoProto.getRole()) {
case CANDIDATE:
msg = datanode + " is in candidate state for " +
roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() + "ms";
final long lastLeaderElapsedTime = roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs();
b.append(", lastLeaderElapsedTime=").append(lastLeaderElapsedTime).append("ms");
break;
case FOLLOWER:
msg = datanode + " closes pipeline when installSnapshot from leader " +
"because leader snapshot doesn't contain any data to replay, " +
"all the log entries prior to the snapshot might have been purged." +
"So follower should not try to install snapshot from leader but" +
"can close the pipeline here. It's in follower state for " +
roleInfoProto.getRoleElapsedTimeMs() + "ms";
b.append(", outstandingOp=").append(roleInfoProto.getFollowerInfo().getOutstandingOp());
break;
case LEADER:
StringBuilder sb = new StringBuilder();
sb.append(datanode).append(" has not seen follower/s");
for (RaftProtos.ServerRpcProto follower : roleInfoProto.getLeaderInfo()
.getFollowerInfoList()) {
if (follower.getLastRpcElapsedTimeMs() > nodeFailureTimeoutMs) {
sb.append(" ").append(RatisHelper.toDatanodeId(follower.getId()))
.append(" for ").append(follower.getLastRpcElapsedTimeMs())
.append("ms");
}
final long followerSlownessTimeoutMs = ratisServerConfig.getFollowerSlownessTimeout();
for (RaftProtos.ServerRpcProto follower : roleInfoProto.getLeaderInfo().getFollowerInfoList()) {
final long lastRpcElapsedTimeMs = follower.getLastRpcElapsedTimeMs();
final boolean slow = lastRpcElapsedTimeMs > followerSlownessTimeoutMs;
final RaftPeerId followerId = RaftPeerId.valueOf(follower.getId().getId());
b.append("\n Follower ").append(followerId)
.append(" with datanodeId ").append(RatisHelper.toDatanodeId(followerId))
.append(" is ").append(slow ? "slow" : " responding")
.append(" with lastRpcElapsedTime=").append(lastRpcElapsedTimeMs).append("ms");
}
msg = sb.toString();
break;
default:
LOG.error("unknown state: {}", roleInfoProto.getRole());
throw new IllegalStateException("node" + id + " is in illegal role "
+ roleInfoProto.getRole());
throw new IllegalStateException("Unexpected role " + roleInfoProto.getRole());
}

triggerPipelineClose(groupId, msg,
ClosePipelineInfo.Reason.PIPELINE_FAILED);
triggerPipelineClose(groupId, b.toString(), ClosePipelineInfo.Reason.PIPELINE_FAILED);
}

private void triggerPipelineClose(RaftGroupId groupId, String detail,
Expand Down Expand Up @@ -869,12 +864,12 @@ public void removeGroup(HddsProtos.PipelineID pipelineId)
processReply(reply);
}

void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
handlePipelineFailure(groupId, roleInfoProto);
void handleFollowerSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto, RaftPeer follower) {
handlePipelineFailure(groupId, roleInfoProto, "slow follower " + follower.getId());
}

void handleNoLeader(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
handlePipelineFailure(groupId, roleInfoProto);
handlePipelineFailure(groupId, roleInfoProto, "no leader");
}

void handleApplyTransactionFailure(RaftGroupId groupId,
Expand All @@ -901,10 +896,9 @@ void handleApplyTransactionFailure(RaftGroupId groupId,
void handleInstallSnapshotFromLeader(RaftGroupId groupId,
RoleInfoProto roleInfoProto,
TermIndex firstTermIndexInLog) {
LOG.warn("Install snapshot notification received from Leader with " +
"termIndex: {}, terminating pipeline: {}",
LOG.warn("handleInstallSnapshotFromLeader for firstTermIndexInLog={}, terminating pipeline: {}",
firstTermIndexInLog, groupId);
handlePipelineFailure(groupId, roleInfoProto);
handlePipelineFailure(groupId, roleInfoProto, "install snapshot notification");
}

/**
Expand Down Expand Up @@ -950,7 +944,7 @@ void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId,
LOG.info("Leader change notification received for group: {} with new " +
"leaderId: {}", groupMemberId.getGroupId(), raftPeerId1);
// Save the reported leader to be sent with the report to SCM
boolean leaderForGroup = this.raftPeerId.equals(raftPeerId1);
final boolean leaderForGroup = server.getId().equals(raftPeerId1);
activePipelines.compute(groupMemberId.getGroupId(),
(key, value) -> value == null ? new ActivePipelineContext(leaderForGroup, false) :
new ActivePipelineContext(leaderForGroup, value.isPendingClose()));
Expand Down