diff --git a/ambry-api/src/main/java/com.github.ambry/network/ClientNetworkRequestMetrics.java b/ambry-api/src/main/java/com.github.ambry/network/ClientNetworkRequestMetrics.java deleted file mode 100644 index 1cb50cbc94..0000000000 --- a/ambry-api/src/main/java/com.github.ambry/network/ClientNetworkRequestMetrics.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Copyright 2016 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package com.github.ambry.network; - -import com.codahale.metrics.Histogram; - - -/** - * Tracks a set of metrics for a network request by a Client - */ -public class ClientNetworkRequestMetrics extends NetworkSendMetrics { - private final Histogram requestQueueTime; - private final Histogram requestTotalTime; - private long timeSpentTillNow; - - public ClientNetworkRequestMetrics(Histogram requestQueueTime, Histogram requestSendTime, Histogram requestTotalTime, - long timeSpentTillNow) { - super(requestSendTime); - this.requestQueueTime = requestQueueTime; - this.requestTotalTime = requestTotalTime; - this.timeSpentTillNow = timeSpentTillNow; - } - - /** - * Updates the time spent by the request in the queue before being sent out - * @param value the time spent by the request in the queue - */ - public void updateQueueTime(long value) { - requestQueueTime.update(value); - timeSpentTillNow += value; - } - - /** - * Updates few metrics once the send completes - * @param value the time spent by the request to be completely sent out - */ - @Override - public void updateSendTime(long value) { - super.updateSendTime(value); - timeSpentTillNow += value; - requestTotalTime.update(timeSpentTillNow); - } -} diff --git a/ambry-api/src/main/java/com.github.ambry/network/NetworkSend.java b/ambry-api/src/main/java/com.github.ambry/network/NetworkSend.java index 7980b2f8a2..a19acceec2 100644 --- a/ambry-api/src/main/java/com.github.ambry/network/NetworkSend.java +++ b/ambry-api/src/main/java/com.github.ambry/network/NetworkSend.java @@ -24,19 +24,34 @@ public class NetworkSend { private final String connectionId; // The bytes to be sent over the connection private final Send payload; + // The creation time of this send + private final long sendCreateTimeInMs; // The start time of this send - private final long sendStartTimeInMs; - private final NetworkSendMetrics metrics; + private long sendStartTimeInMs = -1; private final Time time; + private final ServerNetworkResponseMetrics metrics; - public NetworkSend(String connectionId, Send payload, NetworkSendMetrics metrics, Time time) { + public NetworkSend(String connectionId, Send payload, ServerNetworkResponseMetrics metrics, Time time) { this.connectionId = connectionId; this.payload = payload; + this.sendCreateTimeInMs = time.milliseconds(); this.time = time; - this.sendStartTimeInMs = time.milliseconds(); this.metrics = metrics; } + public long getSendCreateTimeInMs() { + return sendCreateTimeInMs; + } + + public boolean maySetSendStartTimeInMs() { + if (sendStartTimeInMs == -1) { + sendStartTimeInMs = time.milliseconds(); + return true; + } else { + return false; + } + } + public long getSendStartTimeInMs() { return sendStartTimeInMs; } @@ -49,7 +64,7 @@ public Send getPayload() { return payload; } - public void onSendComplete() { + public void updateServerResponseMetrics() { if (metrics != null) { metrics.updateSendTime(time.milliseconds() - sendStartTimeInMs); } diff --git a/ambry-api/src/main/java/com.github.ambry/network/NetworkSendMetrics.java b/ambry-api/src/main/java/com.github.ambry/network/NetworkSendMetrics.java deleted file mode 100644 index fb023a7ed5..0000000000 --- a/ambry-api/src/main/java/com.github.ambry/network/NetworkSendMetrics.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Copyright 2016 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package com.github.ambry.network; - -import com.codahale.metrics.Histogram; - - -/** - * Tracks send time metrics for a {@link NetworkSend} - */ -public class NetworkSendMetrics { - private final Histogram sendTime; - - public NetworkSendMetrics(Histogram sendTime) { - this.sendTime = sendTime; - } - - /** - * Updates sendTime when the networkSend has been sent out completely. - * @param value the time spent by the request or response to be sent out completely - */ - public void updateSendTime(long value) { - sendTime.update(value); - } -} diff --git a/ambry-api/src/main/java/com.github.ambry/network/ServerNetworkResponseMetrics.java b/ambry-api/src/main/java/com.github.ambry/network/ServerNetworkResponseMetrics.java index 80a6c18dca..a82f45966f 100644 --- a/ambry-api/src/main/java/com.github.ambry/network/ServerNetworkResponseMetrics.java +++ b/ambry-api/src/main/java/com.github.ambry/network/ServerNetworkResponseMetrics.java @@ -19,9 +19,10 @@ /** * Tracks a set of metrics for a network response by a Server */ -public class ServerNetworkResponseMetrics extends NetworkSendMetrics { +public class ServerNetworkResponseMetrics { private final Histogram responseQueueTime; + private final Histogram responseSendTime; private final Histogram responseTotalTime; private final Histogram responseSendTimeBySize; private final Histogram responseTotalTimeBySize; @@ -30,8 +31,8 @@ public class ServerNetworkResponseMetrics extends NetworkSendMetrics { public ServerNetworkResponseMetrics(Histogram responseQueueTime, Histogram responseSendTime, Histogram responseTotalTime, Histogram responseSendTimeBySize, Histogram responseTotalTimeBySize, long timeSpentTillNow) { - super(responseSendTime); this.responseQueueTime = responseQueueTime; + this.responseSendTime = responseSendTime; this.responseTotalTime = responseTotalTime; this.responseSendTimeBySize = responseSendTimeBySize; this.responseTotalTimeBySize = responseTotalTimeBySize; @@ -51,9 +52,8 @@ public void updateQueueTime(long value) { * Updates few metrics when send completes * @param value the time spent by the response to be completely sent */ - @Override public void updateSendTime(long value) { - super.updateSendTime(value); + responseSendTime.update(value); if (responseSendTimeBySize != null) { responseSendTimeBySize.update(value); } diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrCluster.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrCluster.java index 3fe3107f6d..a79556d669 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrCluster.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrCluster.java @@ -80,7 +80,7 @@ public HelixVcrCluster(CloudConfig cloudConfig, ClusterMapConfig clusterMapConfi } /** - * Add {@link PartitionId} to assignedPartitionIds set, if {@parm partitionIdStr} valid. + * Add {@link PartitionId} to assignedPartitionIds set, if {@param partitionIdStr} valid. * Used in {@link HelixVcrStateModel} if current VCR becomes leader of a partition. * @param partitionIdStr The partitionIdStr notified by Helix. */ @@ -104,7 +104,7 @@ public void addPartition(String partitionIdStr) { } /** - * Remove {@link PartitionId} from assignedPartitionIds set, if {@parm partitionIdStr} valid. + * Remove {@link PartitionId} from assignedPartitionIds set, if {@param partitionIdStr} valid. * Used in {@link HelixVcrStateModel} if current VCR becomes offline or standby a partition. * @param partitionIdStr The partitionIdStr notified by Helix. */ diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkClient.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkClient.java index 5f93e3d259..62c7868a28 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkClient.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkClient.java @@ -100,10 +100,7 @@ public List sendAndPoll(List requestInfos, int pollTi List responseInfoList = new ArrayList<>(); try { for (RequestInfo requestInfo : requestInfos) { - ClientNetworkRequestMetrics clientNetworkRequestMetrics = - new ClientNetworkRequestMetrics(networkMetrics.requestQueueTime, networkMetrics.requestSendTime, - networkMetrics.requestSendTotalTime, 0); - pendingRequests.add(new RequestMetadata(time.milliseconds(), requestInfo, clientNetworkRequestMetrics)); + pendingRequests.add(new RequestMetadata(requestInfo)); } List sends = prepareSends(responseInfoList); if (networkConfig.networkClientEnableConnectionReplenishment) { @@ -185,8 +182,7 @@ private List prepareSends(List responseInfoList) { requestMetadata.pendingConnectionId = null; } logger.trace("Connection checkout succeeded for {}:{} with connectionId {} ", host, port, connId); - sends.add(new NetworkSend(connId, requestMetadata.requestInfo.getRequest(), - requestMetadata.clientNetworkRequestMetrics, time)); + sends.add(new NetworkSend(connId, requestMetadata.requestInfo.getRequest(), null, time)); connectionIdToRequestInFlight.put(connId, requestMetadata); iter.remove(); requestMetadata.onRequestDequeue(); @@ -340,8 +336,6 @@ public void wakeup() { * A class that consists of a {@link RequestInfo} and some metadata related to the request */ private class RequestMetadata { - // to track network request related metrics - ClientNetworkRequestMetrics clientNetworkRequestMetrics; // the RequestInfo associated with the request. RequestInfo requestInfo; // the time at which this request was queued. @@ -358,11 +352,9 @@ private class RequestMetadata { // check out this connection. This, however, does not affect correctness. private String pendingConnectionId; - RequestMetadata(long requestQueuedAtMs, RequestInfo requestInfo, - ClientNetworkRequestMetrics clientNetworkRequestMetrics) { + RequestMetadata(RequestInfo requestInfo) { this.requestInfo = requestInfo; - this.requestQueuedAtMs = requestQueuedAtMs; - this.clientNetworkRequestMetrics = clientNetworkRequestMetrics; + this.requestQueuedAtMs = time.milliseconds(); this.pendingConnectionId = null; } @@ -370,16 +362,16 @@ private class RequestMetadata { * Actions to be done on dequeue of this request and ready to be sent */ void onRequestDequeue() { - requestDequeuedAtMs = System.currentTimeMillis(); - clientNetworkRequestMetrics.updateQueueTime(requestDequeuedAtMs - requestQueuedAtMs); + requestDequeuedAtMs = time.milliseconds(); + networkMetrics.networkClientRequestQueueTime.update(requestDequeuedAtMs - requestQueuedAtMs); } /** * Actions to be done on receiving response for the request sent */ void onResponseReceive() { - networkMetrics.requestResponseRoundTripTime.update(System.currentTimeMillis() - requestDequeuedAtMs); - networkMetrics.requestResponseTotalTime.update(System.currentTimeMillis() - requestQueuedAtMs); + networkMetrics.networkClientRoundTripTime.update(time.milliseconds() - requestDequeuedAtMs); + networkMetrics.networkClientTotalTime.update(time.milliseconds() - requestQueuedAtMs); } } } diff --git a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java index 316fb6e8b8..671540fede 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java +++ b/ambry-network/src/main/java/com.github.ambry.network/NetworkMetrics.java @@ -36,6 +36,9 @@ public class NetworkMetrics { public final Histogram selectorSelectTime; public final Counter selectorIOCount; public final Histogram selectorIOTime; + public final Counter selectorReadyKeyCount; + public final Counter selectorReadKeyCount; + public final Counter selectorWriteKeyCount; public final Counter selectorNioCloseErrorCount; public final Counter selectorDisconnectedErrorCount; public final Counter selectorIOErrorCount; @@ -45,17 +48,29 @@ public class NetworkMetrics { private final List selectorActiveConnectionsList; private final List> selectorUnreadyConnectionsList; - // Plaintext metrics - // the bytes rate to receive the entire request - public final Meter plaintextReceiveBytesRate; - // the bytes rate to send the entire response - public final Meter plaintextSendBytesRate; - // the time to receive 1KB data in one read call - public final Histogram plaintextReceiveTimeInUsPerKB; - // the time to send 1KB data in one write call - public final Histogram plaintextSendTimeInUsPerKB; - // the time to send data in one write call - public final Histogram plaintextSendTime; + // Transmission metrics + // The time from NetworkSend create to send start + public final Histogram transmissionSendPendingTime; + // From the first byte to all data write to socket channel + public final Histogram transmissionSendAllTime; + // From last byte of request sent out to first byte of response received + public final Histogram transmissionRoundTripTime; + // From the first byte to all data read from socket channel + public final Histogram transmissionReceiveAllTime; + // The bytes rate to send the entire response + public final Meter transmissionSendBytesRate; + //Tthe bytes rate to receive the entire request + public final Meter transmissionReceiveBytesRate; + + // For a single read/write in transmission + // The time to send data in one write call + public final Histogram transmissionSendTime; + // The size of date sent in one write call + public final Histogram transmissionSendSize; + // The time to send data in one read call + public final Histogram transmissionReceiveTime; + // The size of date sent in one read call + public final Histogram transmissionReceiveSize; // SSL metrics public final Counter sslFactoryInitializationCount; @@ -65,28 +80,17 @@ public class NetworkMetrics { public final Histogram sslHandshakeTime; public final Counter sslHandshakeCount; public final Counter sslHandshakeErrorCount; - // the bytes rate to receive the entire request - public final Meter sslReceiveBytesRate; - // the bytes rate to send the entire response - public final Meter sslSendBytesRate; - // the time to receive 1KB data in one read call - public final Histogram sslReceiveTimeInUsPerKB; - // the time to send 1KB data in one write call - public final Histogram sslSendTimeInUsPerKB; - public final Histogram sslEncryptionTimeInUsPerKB; - public final Histogram sslDecryptionTimeInUsPerKB; - // the time to send data in one write call - public final Histogram sslSendTime; - // the count of renegotiation after initial handshake done + // The count of renegotiation after initial handshake done public final Counter sslRenegotiationCount; + public final Meter sslEncryptionTimeInUsPerKB; + public final Meter sslDecryptionTimeInUsPerKB; // NetworkClient metrics public final Histogram networkClientSendAndPollTime; - public final Histogram requestQueueTime; - public final Histogram requestSendTime; - public final Histogram requestSendTotalTime; - public final Histogram requestResponseRoundTripTime; - public final Histogram requestResponseTotalTime; + public final Histogram networkClientRequestQueueTime; + public final Histogram networkClientRoundTripTime; + // NetworkClient request queuing time plus round trip time. + public final Histogram networkClientTotalTime; public final Counter connectionCheckoutTimeoutError; public final Counter connectionNotAvailable; @@ -103,6 +107,9 @@ public NetworkMetrics(MetricRegistry registry) { selectorConnectionCreated = registry.counter(MetricRegistry.name(Selector.class, "SelectorConnectionCreated")); selectorSelectCount = registry.counter(MetricRegistry.name(Selector.class, "SelectorSelectCount")); selectorIOCount = registry.counter(MetricRegistry.name(Selector.class, "SelectorIOCount")); + selectorReadyKeyCount = registry.counter(MetricRegistry.name(Selector.class, "SelectorReadyKeyCount")); + selectorReadKeyCount = registry.counter(MetricRegistry.name(Selector.class, "SelectorReadKeyCount")); + selectorWriteKeyCount = registry.counter(MetricRegistry.name(Selector.class, "SelectorWriteKeyCount")); selectorSelectTime = registry.histogram(MetricRegistry.name(Selector.class, "SelectorSelectTime")); selectorIOTime = registry.histogram(MetricRegistry.name(Selector.class, "SelectorIOTime")); selectorNioCloseErrorCount = registry.counter(MetricRegistry.name(Selector.class, "SelectorNioCloseErrorCount")); @@ -114,19 +121,21 @@ public NetworkMetrics(MetricRegistry registry) { selectorCloseKeyErrorCount = registry.counter(MetricRegistry.name(Selector.class, "SelectorCloseKeyErrorCount")); selectorCloseSocketErrorCount = registry.counter(MetricRegistry.name(Selector.class, "SelectorCloseSocketErrorCount")); - plaintextReceiveBytesRate = registry.meter(MetricRegistry.name(Selector.class, "PlaintextReceiveBytesRate")); - plaintextSendBytesRate = registry.meter(MetricRegistry.name(Selector.class, "PlaintextSendBytesRate")); - plaintextReceiveTimeInUsPerKB = - registry.histogram(MetricRegistry.name(Selector.class, "PlaintextReceiveTimeInUsPerKB")); - plaintextSendTimeInUsPerKB = registry.histogram(MetricRegistry.name(Selector.class, "PlaintextSendTimeInUsPerKB")); - plaintextSendTime = registry.histogram(MetricRegistry.name(Selector.class, "PlaintextSendTime")); - sslReceiveBytesRate = registry.meter(MetricRegistry.name(Selector.class, "SslReceiveBytesRate")); - sslSendBytesRate = registry.meter(MetricRegistry.name(Selector.class, "SslSendBytesRate")); - sslEncryptionTimeInUsPerKB = registry.histogram(MetricRegistry.name(Selector.class, "SslEncryptionTimeInUsPerKB")); - sslDecryptionTimeInUsPerKB = registry.histogram(MetricRegistry.name(Selector.class, "SslDecryptionTimeInUsPerKB")); - sslReceiveTimeInUsPerKB = registry.histogram(MetricRegistry.name(Selector.class, "SslReceiveTimeInUsPerKB")); - sslSendTimeInUsPerKB = registry.histogram(MetricRegistry.name(Selector.class, "SslSendTimeInUsPerKB")); - sslSendTime = registry.histogram(MetricRegistry.name(Selector.class, "SslSendTime")); + + transmissionSendPendingTime = + registry.histogram(MetricRegistry.name(Selector.class, "TransmissionSendPendingTime")); + transmissionSendAllTime = registry.histogram(MetricRegistry.name(Selector.class, "TransmissionSendTime")); + transmissionRoundTripTime = + registry.histogram(MetricRegistry.name(Selector.class, "TransmissionRoundTripTime")); + transmissionReceiveAllTime = + registry.histogram(MetricRegistry.name(Selector.class, "TransmissionReceiveTime")); + transmissionSendBytesRate = registry.meter(MetricRegistry.name(Selector.class, "TransmissionSendBytesRate")); + transmissionReceiveBytesRate = registry.meter(MetricRegistry.name(Selector.class, "TransmissionReceiveBytesRate")); + transmissionSendTime = registry.histogram(MetricRegistry.name(Selector.class, "TransmissionSendTime")); + transmissionSendSize = registry.histogram(MetricRegistry.name(Selector.class, "TransmissionSendSize")); + transmissionReceiveTime = registry.histogram(MetricRegistry.name(Selector.class, "TransmissionReceiveTime")); + transmissionReceiveSize = registry.histogram(MetricRegistry.name(Selector.class, "TransmissionReceiveSize")); + sslFactoryInitializationCount = registry.counter(MetricRegistry.name(Selector.class, "SslFactoryInitializationCount")); sslFactoryInitializationErrorCount = @@ -139,15 +148,16 @@ public NetworkMetrics(MetricRegistry registry) { sslHandshakeCount = registry.counter(MetricRegistry.name(Selector.class, "SslHandshakeCount")); sslHandshakeErrorCount = registry.counter(MetricRegistry.name(Selector.class, "SslHandshakeErrorCount")); sslRenegotiationCount = registry.counter(MetricRegistry.name(Selector.class, "SslRenegotiationCount")); + sslEncryptionTimeInUsPerKB = registry.meter(MetricRegistry.name(Selector.class, "SslEncryptionTimeInUsPerKB")); + sslDecryptionTimeInUsPerKB = registry.meter(MetricRegistry.name(Selector.class, "SslDecryptionTimeInUsPerKB")); networkClientSendAndPollTime = registry.histogram(MetricRegistry.name(NetworkClient.class, "NetworkClientSendAndPollTime")); - requestQueueTime = registry.histogram(MetricRegistry.name(NetworkClient.class, "RequestQueueTime")); - requestSendTime = registry.histogram(MetricRegistry.name(NetworkClient.class, "RequestSendTime")); - requestSendTotalTime = registry.histogram(MetricRegistry.name(NetworkClient.class, "RequestSendTotalTime")); - requestResponseRoundTripTime = - registry.histogram(MetricRegistry.name(NetworkClient.class, "RequestResponseRoundTripTime")); - requestResponseTotalTime = registry.histogram(MetricRegistry.name(NetworkClient.class, "RequestResponseTotalTime")); + networkClientRequestQueueTime = + registry.histogram(MetricRegistry.name(NetworkClient.class, "NetworkClientRequestQueueTime")); + networkClientRoundTripTime = + registry.histogram(MetricRegistry.name(NetworkClient.class, "NetworkClientRoundTripTime")); + networkClientTotalTime = registry.histogram(MetricRegistry.name(NetworkClient.class, "NetworkClientTotalTime")); connectionCheckoutTimeoutError = registry.counter(MetricRegistry.name(NetworkClient.class, "ConnectionCheckoutTimeoutError")); connectionNotAvailable = registry.counter(MetricRegistry.name(NetworkClient.class, "ConnectionNotAvailable")); diff --git a/ambry-network/src/main/java/com.github.ambry.network/PlainTextTransmission.java b/ambry-network/src/main/java/com.github.ambry.network/PlainTextTransmission.java index 6704ad4f2a..b9860d084d 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/PlainTextTransmission.java +++ b/ambry-network/src/main/java/com.github.ambry.network/PlainTextTransmission.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; -import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,14 +60,16 @@ public boolean ready() { public boolean read() throws IOException { if (!hasReceive()) { networkReceive = new NetworkReceive(getConnectionId(), new BoundedByteBufferReceive(), time); + metrics.transmissionRoundTripTime.update(time.milliseconds() - sendCompleteTime); } - long startTimeMs = SystemTime.getInstance().milliseconds(); + long startTimeMs = time.milliseconds(); long bytesRead = networkReceive.getReceivedBytes().readFrom(socketChannel); - long readTimeMs = SystemTime.getInstance().milliseconds() - startTimeMs; + long readTimeMs = time.milliseconds() - startTimeMs; logger.trace("Bytes read " + bytesRead + " from {} using key {} Time: {}", socketChannel.socket().getRemoteSocketAddress(), getConnectionId(), readTimeMs); if (bytesRead > 0) { - metrics.plaintextReceiveTimeInUsPerKB.update(TimeUnit.MILLISECONDS.toMicros(readTimeMs) * 1024 / bytesRead); + metrics.transmissionReceiveTime.update(readTimeMs); + metrics.transmissionReceiveSize.update(bytesRead); } return networkReceive.getReceivedBytes().isReadComplete(); } @@ -85,14 +86,17 @@ public boolean write() throws IOException { if (send == null) { throw new IllegalStateException("Registered for write interest but no response attached to key."); } - long startTimeMs = SystemTime.getInstance().milliseconds(); + if (networkSend.maySetSendStartTimeInMs()) { + metrics.transmissionSendPendingTime.update(time.milliseconds() - networkSend.getSendCreateTimeInMs()); + } + long startTimeMs = time.milliseconds(); long bytesWritten = send.writeTo(socketChannel); - long writeTimeMs = SystemTime.getInstance().milliseconds() - startTimeMs; + long writeTimeMs = time.milliseconds() - startTimeMs; logger.trace("Bytes written {} to {} using key {} Time: {}", bytesWritten, socketChannel.socket().getRemoteSocketAddress(), getConnectionId(), writeTimeMs); if (bytesWritten > 0) { - metrics.plaintextSendTimeInUsPerKB.update(TimeUnit.MILLISECONDS.toMicros(writeTimeMs) * 1024 / bytesWritten); - metrics.plaintextSendTime.update(writeTimeMs); + metrics.transmissionSendTime.update(writeTimeMs); + metrics.transmissionSendSize.update(bytesWritten); } return send.isSendComplete(); } @@ -114,26 +118,4 @@ public void close() { logger.error("Exception closing connection to node {}:", getConnectionId(), e); } } - - /** - * Actions to be taken on completion of {@link Send} in {@link NetworkSend} - */ - @Override - public void onSendComplete() { - long sendTimeMs = SystemTime.getInstance().milliseconds() - networkSend.getSendStartTimeInMs(); - networkSend.onSendComplete(); - double sendBytesRate = networkSend.getPayload().sizeInBytes() / ((double) sendTimeMs / SystemTime.MsPerSec); - metrics.plaintextSendBytesRate.mark((long) sendBytesRate); - } - - /** - * Actions to be taken on completion of {@link BoundedByteBufferReceive} in {@link NetworkReceive} - */ - @Override - public void onReceiveComplete() { - long receiveTimeMs = SystemTime.getInstance().milliseconds() - networkReceive.getReceiveStartTimeInMs(); - double receiveBytesRate = - networkReceive.getReceivedBytes().sizeRead() / ((double) receiveTimeMs / SystemTime.MsPerSec); - metrics.plaintextReceiveBytesRate.mark((long) receiveBytesRate); - } } diff --git a/ambry-network/src/main/java/com.github.ambry.network/SSLTransmission.java b/ambry-network/src/main/java/com.github.ambry.network/SSLTransmission.java index 903a237537..6b9bf5d981 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/SSLTransmission.java +++ b/ambry-network/src/main/java/com.github.ambry.network/SSLTransmission.java @@ -385,14 +385,16 @@ private SSLEngineResult handshakeUnwrap(Boolean doRead) throws IOException { public boolean read() throws IOException { if (!hasReceive()) { this.networkReceive = new NetworkReceive(getConnectionId(), new BoundedByteBufferReceive(), time); + metrics.transmissionRoundTripTime.update(time.milliseconds() - sendCompleteTime); } - long startTimeMs = SystemTime.getInstance().milliseconds(); + long startTimeMs = time.milliseconds(); long bytesRead = networkReceive.getReceivedBytes().readFrom(this); - long readTimeMs = SystemTime.getInstance().milliseconds() - startTimeMs; - logger.trace("Bytes read {} from {} using key {} Time: {}", bytesRead, + long readTimeMs = time.milliseconds() - startTimeMs; + logger.trace("Bytes read {} from {} using key {} Time: {} Ms.", bytesRead, socketChannel.socket().getRemoteSocketAddress(), getConnectionId(), readTimeMs); if (bytesRead > 0) { - metrics.sslReceiveTimeInUsPerKB.update(TimeUnit.MILLISECONDS.toMicros(readTimeMs) * 1024 / bytesRead); + metrics.transmissionReceiveTime.update(readTimeMs); + metrics.transmissionReceiveSize.update(bytesRead); } return networkReceive.getReceivedBytes().isReadComplete(); } @@ -435,7 +437,7 @@ public int read(ByteBuffer dst) throws IOException { long decryptionTimeMs = SystemTime.getInstance().milliseconds() - startTimeMs; logger.trace("SSL decryption time: {} ms for {} bytes", decryptionTimeMs, unwrapResult.bytesProduced()); if (unwrapResult.bytesProduced() > 0) { - metrics.sslDecryptionTimeInUsPerKB.update( + metrics.sslDecryptionTimeInUsPerKB.mark( TimeUnit.MILLISECONDS.toMicros(decryptionTimeMs) * 1024 / unwrapResult.bytesProduced()); } netReadBuffer.compact(); @@ -498,14 +500,17 @@ public boolean write() throws IOException { return false; } } - long startTimeMs = SystemTime.getInstance().milliseconds(); + if (networkSend.maySetSendStartTimeInMs()) { + metrics.transmissionSendPendingTime.update(time.milliseconds() - networkSend.getSendCreateTimeInMs()); + } + long startTimeMs = time.milliseconds(); long bytesWritten = send.writeTo(this); - long writeTimeMs = SystemTime.getInstance().milliseconds() - startTimeMs; - logger.trace("Bytes written {} to {} using key {} Time: {}", bytesWritten, + long writeTimeMs = time.milliseconds() - startTimeMs; + logger.trace("Bytes written {} to {} using key {} Time: {} ms", bytesWritten, socketChannel.socket().getRemoteSocketAddress(), getConnectionId(), writeTimeMs); if (bytesWritten > 0) { - metrics.sslSendTimeInUsPerKB.update(TimeUnit.MILLISECONDS.toMicros(writeTimeMs) * 1024 / bytesWritten); - metrics.sslSendTime.update(writeTimeMs); + metrics.transmissionSendTime.update(writeTimeMs); + metrics.transmissionSendSize.update(bytesWritten); } return (send.isSendComplete() && netWriteBuffer.remaining() == 0); } @@ -538,7 +543,7 @@ public int write(ByteBuffer src) throws IOException { long encryptionTimeNs = SystemTime.getInstance().nanoseconds() - startTimeNs; logger.trace("SSL encryption time: {} ns for {} bytes", encryptionTimeNs, wrapResult.bytesConsumed()); if (wrapResult.bytesConsumed() > 0) { - metrics.sslEncryptionTimeInUsPerKB.update(encryptionTimeNs / wrapResult.bytesConsumed()); + metrics.sslEncryptionTimeInUsPerKB.mark(encryptionTimeNs / wrapResult.bytesConsumed()); } netWriteBuffer.flip(); //handle ssl renegotiation @@ -675,26 +680,4 @@ private void handshakeFailure() { logger.debug("SSLEngine.closeInBound() raised an exception.", e); } } - - /** - * Actions to be taken on completion of {@link Send} in {@link NetworkSend} - */ - @Override - public void onSendComplete() { - long sendTimeMs = SystemTime.getInstance().milliseconds() - networkSend.getSendStartTimeInMs(); - networkSend.onSendComplete(); - double sendBytesRate = networkSend.getPayload().sizeInBytes() / ((double) sendTimeMs / SystemTime.MsPerSec); - metrics.sslSendBytesRate.mark((long) sendBytesRate); - } - - /** - * Actions to be taken on completion of {@link BoundedByteBufferReceive} in {@link NetworkReceive} - */ - @Override - public void onReceiveComplete() { - long receiveTimeMs = SystemTime.getInstance().milliseconds() - networkReceive.getReceiveStartTimeInMs(); - double receiveBytesRate = - networkReceive.getReceivedBytes().sizeRead() / ((double) receiveTimeMs / SystemTime.MsPerSec); - metrics.sslReceiveBytesRate.mark((long) receiveBytesRate); - } } diff --git a/ambry-network/src/main/java/com.github.ambry.network/Selector.java b/ambry-network/src/main/java/com.github.ambry.network/Selector.java index 75affe0ff3..5d0493e672 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Selector.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Selector.java @@ -335,8 +335,9 @@ private void pollOnMainThread(long timeoutMs, List sends) throws IO if (readyKeys > 0) { long endSelect = time.milliseconds(); - this.metrics.selectorSelectTime.update(endSelect - startSelect); + metrics.selectorSelectTime.update(endSelect - startSelect); Set keys = nioSelector.selectedKeys(); + metrics.selectorReadyKeyCount.inc(keys.size()); Iterator iter = keys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); @@ -361,6 +362,7 @@ private void pollOnMainThread(long timeoutMs, List sends) throws IO } if (key.isReadable() && transmission.ready()) { + metrics.selectorReadKeyCount.inc(); NetworkReceive networkReceive = read(key, transmission); if (networkReceive == null) { // Exception happened in read. @@ -369,6 +371,7 @@ private void pollOnMainThread(long timeoutMs, List sends) throws IO this.completedReceives.add(networkReceive); } } else if (key.isWritable() && transmission.ready()) { + metrics.selectorWriteKeyCount.inc(); NetworkSend networkSend = write(key, transmission); if (networkSend == null) { // Exception happened in write. @@ -389,8 +392,8 @@ private void pollOnMainThread(long timeoutMs, List sends) throws IO } } checkUnreadyConnectionsStatus(); - this.metrics.selectorIOCount.inc(readyKeys); - this.metrics.selectorIOTime.update(time.milliseconds() - endSelect); + metrics.selectorIOCount.inc(); + metrics.selectorIOTime.update(time.milliseconds() - endSelect); } disconnected.addAll(closedConnections); closedConnections.clear(); @@ -421,6 +424,7 @@ private void pollWithExecutorPool(long timeoutMs, List sends) throw long endSelect = time.milliseconds(); this.metrics.selectorSelectTime.update(endSelect - startSelect); Set keys = nioSelector.selectedKeys(); + metrics.selectorReadyKeyCount.inc(keys.size()); Iterator iter = keys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); @@ -445,9 +449,11 @@ private void pollWithExecutorPool(long timeoutMs, List sends) throw } if (key.isReadable() && transmission.ready()) { + metrics.selectorReadKeyCount.inc(); completedReceivesFutures.add(executorPool.submit(() -> read(key, transmission))); readWriteKeySet.add(key); } else if (key.isWritable() && transmission.ready()) { + metrics.selectorWriteKeyCount.inc(); completedSendsFutures.add(executorPool.submit(() -> write(key, transmission))); readWriteKeySet.add(key); } else if (!key.isValid()) { @@ -493,8 +499,8 @@ private void pollWithExecutorPool(long timeoutMs, List sends) throw close(keyWithError); } checkUnreadyConnectionsStatus(); - this.metrics.selectorIOCount.inc(readyKeys); - this.metrics.selectorIOTime.update(time.milliseconds() - endSelect); + metrics.selectorIOCount.inc(); + metrics.selectorIOTime.update(time.milliseconds() - endSelect); } disconnected.addAll(closedConnections); closedConnections.clear(); diff --git a/ambry-network/src/main/java/com.github.ambry.network/SocketRequestResponseChannel.java b/ambry-network/src/main/java/com.github.ambry.network/SocketRequestResponseChannel.java index 74e7d14f2d..3820ab137b 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/SocketRequestResponseChannel.java +++ b/ambry-network/src/main/java/com.github.ambry.network/SocketRequestResponseChannel.java @@ -97,7 +97,7 @@ public void onDequeueFromResponseQueue() { } } - public NetworkSendMetrics getMetrics() { + public ServerNetworkResponseMetrics getMetrics() { return metrics; } } diff --git a/ambry-network/src/main/java/com.github.ambry.network/Transmission.java b/ambry-network/src/main/java/com.github.ambry.network/Transmission.java index e2ddac7cf1..d55e6cd299 100644 --- a/ambry-network/src/main/java/com.github.ambry.network/Transmission.java +++ b/ambry-network/src/main/java/com.github.ambry.network/Transmission.java @@ -13,6 +13,7 @@ */ package com.github.ambry.network; +import com.github.ambry.utils.SystemTime; import com.github.ambry.utils.Time; import java.io.IOException; import java.net.SocketAddress; @@ -35,6 +36,7 @@ public abstract class Transmission { protected SelectionKey key = null; protected final Time time; protected final NetworkMetrics metrics; + protected long sendCompleteTime; public Transmission(String connectionId, SocketChannel socketChannel, SelectionKey key, Time time, NetworkMetrics metrics) { @@ -100,12 +102,25 @@ public void setNetworkSend(NetworkSend networkSend) { /** * Actions to be taken on completion of {@link Send} in {@link NetworkSend} */ - public abstract void onSendComplete(); + public void onSendComplete() { + networkSend.updateServerResponseMetrics(); + sendCompleteTime = time.milliseconds(); + long sendTimeMs = sendCompleteTime - networkSend.getSendStartTimeInMs(); + metrics.transmissionSendAllTime.update(sendTimeMs); + double sendBytesRate = networkSend.getPayload().sizeInBytes() / ((double) sendTimeMs / SystemTime.MsPerSec); + metrics.transmissionSendBytesRate.mark((long) sendBytesRate); + } /** * Actions to be taken on completion of {@link BoundedByteBufferReceive} in {@link NetworkReceive} */ - public abstract void onReceiveComplete(); + public void onReceiveComplete() { + long receiveTimeMs = time.milliseconds() - networkReceive.getReceiveStartTimeInMs(); + metrics.transmissionReceiveAllTime.update(receiveTimeMs); + double receiveBytesRate = + networkReceive.getReceivedBytes().sizeRead() / ((double) receiveTimeMs / SystemTime.MsPerSec); + metrics.transmissionReceiveBytesRate.mark((long) receiveBytesRate); + } /** * Returns true if {@link NetworkReceive} is read completely diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerPlaintextTest.java b/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerPlaintextTest.java index d7cb5691c0..d64723c971 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerPlaintextTest.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerPlaintextTest.java @@ -48,8 +48,8 @@ public class RouterServerPlaintextTest { private static MockCluster plaintextCluster; private static RouterServerTestFramework testFramework; private static MetricRegistry routerMetricRegistry; - private static long plainTextSendBytesCountBeforeTest; - private static long plainTextReceiveBytesCountBeforeTest; + private static long transmissionSendBytesCountBeforeTest; + private static long transmissionReceiveBytesCountBeforeTest; private static Account refAccount; private static List refContainers = new ArrayList<>(); @@ -119,20 +119,17 @@ public static void cleanup() throws IOException { @Before public void before() { Map meters = routerMetricRegistry.getMeters(); - plainTextSendBytesCountBeforeTest = meters.get(plaintextSendBytesMetricName).getCount(); - plainTextReceiveBytesCountBeforeTest = meters.get(plaintextReceiveBytesMetricName).getCount(); + transmissionSendBytesCountBeforeTest = meters.get(transmissionSendBytesMetricName).getCount(); + transmissionReceiveBytesCountBeforeTest = meters.get(transmissionReceiveBytesMetricName).getCount(); } @After public void after() { Map meters = routerMetricRegistry.getMeters(); - Assert.assertTrue("Router should have sent over Plain Text", - meters.get(plaintextSendBytesMetricName).getCount() != plainTextSendBytesCountBeforeTest); - Assert.assertTrue("Router should have received over Plain Text", - meters.get(plaintextReceiveBytesMetricName).getCount() != plainTextReceiveBytesCountBeforeTest); - Assert.assertTrue("Router should not have sent over SSL", meters.get(sslSendBytesMetricName).getCount() == 0); - Assert.assertTrue("Router should not have received over SSL", - meters.get(sslReceiveBytesMetricName).getCount() == 0); + Assert.assertTrue("Router should have been sent", + meters.get(transmissionSendBytesMetricName).getCount() != transmissionSendBytesCountBeforeTest); + Assert.assertTrue("Router should have been sent", + meters.get(transmissionReceiveBytesMetricName).getCount() != transmissionReceiveBytesCountBeforeTest); } /** diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerSSLTest.java b/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerSSLTest.java index 4dde34e0a8..f318959ede 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerSSLTest.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerSSLTest.java @@ -47,8 +47,8 @@ public class RouterServerSSLTest { private static MockCluster sslCluster; private static RouterServerTestFramework testFramework; private static MetricRegistry routerMetricRegistry; - private static long sslSendBytesCountBeforeTest; - private static long sslReceiveBytesCountBeforeTest; + private static long transmissionSendBytesCountBeforeTest; + private static long transmissionReceiveBytesCountBeforeTest; /** * Running for both regular and encrypted blobs @@ -104,21 +104,17 @@ public static void cleanup() throws IOException { @Before public void before() { Map meters = routerMetricRegistry.getMeters(); - sslSendBytesCountBeforeTest = meters.get(sslSendBytesMetricName).getCount(); - sslReceiveBytesCountBeforeTest = meters.get(sslReceiveBytesMetricName).getCount(); + transmissionSendBytesCountBeforeTest = meters.get(transmissionSendBytesMetricName).getCount(); + transmissionReceiveBytesCountBeforeTest = meters.get(transmissionReceiveBytesMetricName).getCount(); } @After public void after() { Map meters = routerMetricRegistry.getMeters(); - Assert.assertTrue("Router should have sent over SSL", - meters.get(sslSendBytesMetricName).getCount() != sslSendBytesCountBeforeTest); - Assert.assertTrue("Router should have received over SSL", - meters.get(sslReceiveBytesMetricName).getCount() != sslReceiveBytesCountBeforeTest); - Assert.assertTrue("Router should not have sent over Plain Text", - meters.get(plaintextSendBytesMetricName).getCount() == 0); - Assert.assertTrue("Router should not have received over Plain Text", - meters.get(plaintextReceiveBytesMetricName).getCount() == 0); + Assert.assertTrue("Router should have been sent", + meters.get(transmissionSendBytesMetricName).getCount() != transmissionSendBytesCountBeforeTest); + Assert.assertTrue("Router should have been sent", + meters.get(transmissionReceiveBytesMetricName).getCount() != transmissionReceiveBytesCountBeforeTest); } /** diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerTestFramework.java b/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerTestFramework.java index 7e505228de..0ca3c6f9be 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerTestFramework.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/RouterServerTestFramework.java @@ -75,10 +75,8 @@ class RouterServerTestFramework { final InMemAccountService accountService = new InMemAccountService(false, true); - public static String sslSendBytesMetricName = Selector.class.getName() + ".SslSendBytesRate"; - public static String sslReceiveBytesMetricName = Selector.class.getName() + ".SslReceiveBytesRate"; - public static String plaintextSendBytesMetricName = Selector.class.getName() + ".PlaintextSendBytesRate"; - public static String plaintextReceiveBytesMetricName = Selector.class.getName() + ".PlaintextReceiveBytesRate"; + public static String transmissionSendBytesMetricName = Selector.class.getName() + ".TransmissionSendBytesRate"; + public static String transmissionReceiveBytesMetricName = Selector.class.getName() + ".TransmissionReceiveBytesRate"; /** * Instantiate a framework for testing router-server interaction. Creates a non-blocking router to interact with the