Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exclude timed out response from adaptive tracker's histogram #1173

Merged
merged 8 commits into from
May 17, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public int warmUpConnections(List<DataNodeId> dataNodeIds, int connectionWarmUpP
connId);
expectedConnections++;
} catch (IOException e) {
logger.error("Received exception while warming up connection: {}", e);
logger.error("Received exception while warming up connection: ", e);
}
}
}
Expand All @@ -242,7 +242,7 @@ public int warmUpConnections(List<DataNodeId> dataNodeIds, int connectionWarmUpP
failedConnections += selector.disconnected().size();
handleSelectorEvents(null);
} catch (IOException e) {
logger.error("Warm up received unexpected error while polling: {}", e);
logger.error("Warm up received unexpected error while polling: ", e);
}
if (System.currentTimeMillis() - startTime > timeForWarmUp) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,17 @@ class AdaptiveOperationTracker extends SimpleOperationTracker {
}

@Override
public void onResponse(ReplicaId replicaId, boolean isSuccessFul) {
super.onResponse(replicaId, isSuccessFul);
public void onResponse(ReplicaId replicaId, RequestResult requestResult) {
super.onResponse(replicaId, requestResult);
long elapsedTime;
if (unexpiredRequestSendTimes.containsKey(replicaId)) {
elapsedTime = time.milliseconds() - unexpiredRequestSendTimes.remove(replicaId).getSecond();
} else {
elapsedTime = time.milliseconds() - expiredRequestSendTimes.remove(replicaId);
}
getLatencyHistogram(replicaId).update(elapsedTime);
if (requestResult != RequestResult.TIMED_OUT) {
getLatencyHistogram(replicaId).update(elapsedTime);
}
}

@Override
Expand All @@ -101,7 +103,7 @@ public Iterator<ReplicaId> getReplicaIterator() {
* @return the {@link Histogram} that tracks requests to the class of replicas (intra or inter DC) that
* {@code replicaId} belongs to.
*/
private Histogram getLatencyHistogram(ReplicaId replicaId) {
Histogram getLatencyHistogram(ReplicaId replicaId) {
if (replicaId.getDataNodeId().getDatacenterName().equals(datacenterName)) {
return localColoTracker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ private void processServerError(ReplicaId replica, ServerErrorCode serverErrorCo
switch (serverErrorCode) {
case No_Error:
case Blob_Deleted:
operationTracker.onResponse(replica, true);
operationTracker.onResponse(replica, RequestResult.SUCCESS);
if (RouterUtils.isRemoteReplica(routerConfig, replica)) {
logger.trace("Cross colo request successful for remote replica {} in {} ", replica.getDataNodeId(),
replica.getDataNodeId().getDatacenterName());
Expand Down Expand Up @@ -303,7 +303,8 @@ private void updateOperationState(ReplicaId replica, RouterErrorCode error) {
resolvedRouterErrorCode = error;
}
}
operationTracker.onResponse(replica, false);
operationTracker.onResponse(replica,
resolvedRouterErrorCode == RouterErrorCode.OperationTimedOut ? RequestResult.TIMED_OUT : RequestResult.FAILURE);
if (error != RouterErrorCode.BlobDeleted && error != RouterErrorCode.BlobExpired) {
routerMetrics.routerRequestErrorCount.inc();
}
Expand All @@ -315,7 +316,7 @@ private void updateOperationState(ReplicaId replica, RouterErrorCode error) {
*/
private void checkAndMaybeComplete() {
// operationCompleted is true if Blob_Authorization_Failure was received.
if (operationTracker.isDone() || operationCompleted == true) {
if (operationTracker.isDone() || operationCompleted) {
if (!operationTracker.hasSucceeded()) {
setOperationException(
new RouterException("The DeleteOperation could not be completed.", resolvedRouterErrorCode));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private void cleanupExpiredInFlightRequests() {
while (inFlightRequestsIterator.hasNext()) {
Map.Entry<Integer, GetRequestInfo> entry = inFlightRequestsIterator.next();
if (time.milliseconds() - entry.getValue().startTimeMs > routerConfig.routerRequestTimeoutMs) {
onErrorResponse(entry.getValue().replicaId);
onErrorResponse(entry.getValue().replicaId, RouterErrorCode.OperationTimedOut);
logger.trace("GetBlobInfoRequest with correlationId {} in flight has expired for replica {} ", entry.getKey(),
entry.getValue().replicaId.getDataNodeId());
// Do not notify this as a failure to the response handler, as this timeout could simply be due to
Expand Down Expand Up @@ -208,15 +208,15 @@ void handleResponse(ResponseInfo responseInfo, GetResponse getResponse) {
logger.trace("GetBlobInfoRequest with response correlationId {} timed out for replica {} ", correlationId,
getRequestInfo.replicaId.getDataNodeId());
setOperationException(new RouterException("Operation timed out", RouterErrorCode.OperationTimedOut));
onErrorResponse(getRequestInfo.replicaId);
onErrorResponse(getRequestInfo.replicaId, RouterErrorCode.OperationTimedOut);
} else {
if (getResponse == null) {
logger.trace(
"GetBlobInfoRequest with response correlationId {} received an unexpected error on response deserialization from replica {} ",
correlationId, getRequestInfo.replicaId.getDataNodeId());
setOperationException(new RouterException("Response deserialization received an unexpected error",
RouterErrorCode.UnexpectedInternalError));
onErrorResponse(getRequestInfo.replicaId);
onErrorResponse(getRequestInfo.replicaId, RouterErrorCode.UnexpectedInternalError);
} else {
if (getResponse.getCorrelationId() != correlationId) {
// The NetworkClient associates a response with a request based on the fact that only one request is sent
Expand All @@ -230,7 +230,7 @@ void handleResponse(ResponseInfo responseInfo, GetResponse getResponse) {
"The correlation id in the GetResponse " + getResponse.getCorrelationId()
+ "is not the same as the correlation id in the associated GetRequest: " + correlationId,
RouterErrorCode.UnexpectedInternalError));
onErrorResponse(getRequestInfo.replicaId);
onErrorResponse(getRequestInfo.replicaId, RouterErrorCode.UnexpectedInternalError);
// we do not notify the ResponseHandler responsible for failure detection as this is an unexpected error.
} else {
try {
Expand All @@ -244,7 +244,7 @@ void handleResponse(ResponseInfo responseInfo, GetResponse getResponse) {
routerMetrics.responseDeserializationErrorCount.inc();
setOperationException(new RouterException("Response deserialization received an unexpected error", e,
RouterErrorCode.UnexpectedInternalError));
onErrorResponse(getRequestInfo.replicaId);
onErrorResponse(getRequestInfo.replicaId, RouterErrorCode.UnexpectedInternalError);
}
}
}
Expand All @@ -269,7 +269,7 @@ private void processGetBlobInfoResponse(GetRequestInfo getRequestInfo, GetRespon
setOperationException(new RouterException(
"Unexpected number of partition responses, expected: 1, " + "received: " + partitionsInResponse,
RouterErrorCode.UnexpectedInternalError));
onErrorResponse(getRequestInfo.replicaId);
onErrorResponse(getRequestInfo.replicaId, RouterErrorCode.UnexpectedInternalError);
// Again, no need to notify the responseHandler.
} else {
getError = getResponse.getPartitionResponseInfoList().get(0).getErrorCode();
Expand All @@ -280,12 +280,12 @@ private void processGetBlobInfoResponse(GetRequestInfo getRequestInfo, GetRespon
setOperationException(new RouterException(
"Unexpected number of messages in a partition response, expected: 1, " + "received: " + msgsInResponse,
RouterErrorCode.UnexpectedInternalError));
onErrorResponse(getRequestInfo.replicaId);
onErrorResponse(getRequestInfo.replicaId, RouterErrorCode.UnexpectedInternalError);
} else {
MessageMetadata messageMetadata = partitionResponseInfo.getMessageMetadataList().get(0);
MessageInfo messageInfo = partitionResponseInfo.getMessageInfoList().get(0);
handleBody(getResponse.getInputStream(), messageMetadata, messageInfo);
operationTracker.onResponse(getRequestInfo.replicaId, true);
operationTracker.onResponse(getRequestInfo.replicaId, RequestResult.SUCCESS);
if (RouterUtils.isRemoteReplica(routerConfig, getRequestInfo.replicaId)) {
logger.trace("Cross colo request successful for remote replica in {} ",
getRequestInfo.replicaId.getDataNodeId().getDatacenterName());
Expand All @@ -294,35 +294,35 @@ private void processGetBlobInfoResponse(GetRequestInfo getRequestInfo, GetRespon
}
} else {
// process and set the most relevant exception.
if (getError != ServerErrorCode.No_Error) {
logger.trace("Replica {} returned error {} with response correlationId {} ",
getRequestInfo.replicaId.getDataNodeId(), getError, getResponse.getCorrelationId());
}
processServerError(getError);
logger.trace("Replica {} returned error {} with response correlationId {} ",
getRequestInfo.replicaId.getDataNodeId(), getError, getResponse.getCorrelationId());
RouterErrorCode routerErrorCode = processServerError(getError);
if (getError == ServerErrorCode.Blob_Deleted || getError == ServerErrorCode.Blob_Expired
|| getError == ServerErrorCode.Blob_Authorization_Failure) {
// this is a successful response and one that completes the operation regardless of whether the
// success target has been reached or not.
operationCompleted = true;
} else {
onErrorResponse(getRequestInfo.replicaId);
}
// any server error code that is not equal to ServerErrorCode.No_Error, the onErrorResponse should be invoked
// because the operation itself doesn't succeed although the response in some cases is successful (i.e. Blob_Deleted)
onErrorResponse(getRequestInfo.replicaId, routerErrorCode);
}
}
} else {
logger.trace("Replica {} returned an error {} for a GetBlobInfoRequest with response correlationId : {} ",
getRequestInfo.replicaId.getDataNodeId(), getError, getResponse.getCorrelationId());
processServerError(getError);
onErrorResponse(getRequestInfo.replicaId);
onErrorResponse(getRequestInfo.replicaId, processServerError(getError));
}
}

/**
* Perform the necessary actions when a request to a replica fails.
* @param replicaId the {@link ReplicaId} associated with the failed response.
* @param routerErrorCode the {@link RouterErrorCode} associated with the failed response.
*/
void onErrorResponse(ReplicaId replicaId) {
operationTracker.onResponse(replicaId, false);
void onErrorResponse(ReplicaId replicaId, RouterErrorCode routerErrorCode) {
operationTracker.onResponse(replicaId,
routerErrorCode == RouterErrorCode.OperationTimedOut ? RequestResult.TIMED_OUT : RequestResult.FAILURE);
routerMetrics.routerRequestErrorCount.inc();
routerMetrics.getDataNodeBasedMetrics(replicaId.getDataNodeId()).getBlobInfoRequestErrorCount.inc();
}
Expand Down Expand Up @@ -382,36 +382,37 @@ private void handleBody(InputStream payload, MessageMetadata messageMetadata, Me
/**
* Process the given {@link ServerErrorCode} and set operation status accordingly.
* @param errorCode the {@link ServerErrorCode} to process.
* @return the {@link RouterErrorCode} mapped from server error code.
*/
private void processServerError(ServerErrorCode errorCode) {
private RouterErrorCode processServerError(ServerErrorCode errorCode) {
RouterErrorCode resolvedRouterErrorCode;
switch (errorCode) {
case Blob_Authorization_Failure:
logger.trace("Requested blob authorization failed");
setOperationException(
new RouterException("Server returned: " + errorCode, RouterErrorCode.BlobAuthorizationFailure));
resolvedRouterErrorCode = RouterErrorCode.BlobAuthorizationFailure;
break;
case Blob_Deleted:
logger.trace("Requested blob was deleted");
setOperationException(new RouterException("Server returned: " + errorCode, RouterErrorCode.BlobDeleted));
resolvedRouterErrorCode = RouterErrorCode.BlobDeleted;
break;
case Blob_Expired:
logger.trace("Requested blob has expired");
setOperationException(new RouterException("Server returned: " + errorCode, RouterErrorCode.BlobExpired));
resolvedRouterErrorCode = RouterErrorCode.BlobExpired;
break;
case Blob_Not_Found:
logger.trace("Requested blob was not found on this server");
setOperationException(new RouterException("Server returned: " + errorCode, RouterErrorCode.BlobDoesNotExist));
resolvedRouterErrorCode = RouterErrorCode.BlobDoesNotExist;
break;
case Disk_Unavailable:
case Replica_Unavailable:
logger.trace("Disk or replica on which the requested blob resides is not accessible");
setOperationException(new RouterException("Server returned: " + errorCode, RouterErrorCode.AmbryUnavailable));
resolvedRouterErrorCode = RouterErrorCode.AmbryUnavailable;
break;
default:
setOperationException(
new RouterException("Server returned: " + errorCode, RouterErrorCode.UnexpectedInternalError));
break;
resolvedRouterErrorCode = RouterErrorCode.UnexpectedInternalError;
}
setOperationException(new RouterException("Server returned: " + errorCode, resolvedRouterErrorCode));
return resolvedRouterErrorCode;
}

/**
Expand Down Expand Up @@ -444,5 +445,12 @@ private void checkAndMaybeComplete() {
NonBlockingRouter.completeOperation(null, getOperationCallback, operationResult, e);
}
}

/**
* @return {@link OperationTracker} associated with this operation
*/
OperationTracker getOperationTrackerInUse() {
return operationTracker;
}
}

Loading