Skip to content

Commit

Permalink
Add metrics for ResponseWriteAndFlushTimeForDataRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantmane committed Aug 12, 2024
1 parent 37cb75b commit 8d3accf
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public abstract class ReadResponse {
private int hadamardProductCount = 0;
private int countOperatorCount = 0;
private int rcu = 0;
private long responseWriteAndFlushStartTimeNanos = -1;

public void setCompressionStrategy(CompressionStrategy compressionStrategy) {
this.compressionStrategy = compressionStrategy;
Expand Down Expand Up @@ -205,4 +206,16 @@ public int getCountOperatorCount() {
public abstract ByteBuf getResponseBody();

public abstract int getResponseSchemaIdHeader();

public void setResponseWriteAndFlushStartTimeNanos(long responseWriteAndFlushStartTimeNanos) {
this.responseWriteAndFlushStartTimeNanos = responseWriteAndFlushStartTimeNanos;
}

public void recordResponseWriteAndFlushStartTimeNanos() {
this.responseWriteAndFlushStartTimeNanos = System.nanoTime();
}

public long getResponseWriteAndFlushStartTimeNanos() {
return responseWriteAndFlushStartTimeNanos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void initChannel(SocketChannel ch) {
ServerConnectionStatsHandler serverConnectionStatsHandler =
new ServerConnectionStatsHandler(serverConnectionStats, nettyStats, serverConfig.getRouterPrincipalName());
pipeline.addLast(serverConnectionStatsHandler);
StatsHandler statsHandler = new StatsHandler(singleGetStats, multiGetStats, computeStats);
StatsHandler statsHandler = new StatsHandler(singleGetStats, multiGetStats, computeStats, nettyStats);
pipeline.addLast(statsHandler);
if (whetherNeedServerCodec) {
pipeline.addLast(new HttpServerCodec());
Expand Down Expand Up @@ -272,7 +272,7 @@ public void initChannel(SocketChannel ch) {
public VeniceServerGrpcRequestProcessor initGrpcRequestProcessor() {
VeniceServerGrpcRequestProcessor grpcServerRequestProcessor = new VeniceServerGrpcRequestProcessor();

StatsHandler statsHandler = new StatsHandler(singleGetStats, multiGetStats, computeStats);
StatsHandler statsHandler = new StatsHandler(singleGetStats, multiGetStats, computeStats, nettyStats);
GrpcStatsHandler grpcStatsHandler = new GrpcStatsHandler(statsHandler);
grpcServerRequestProcessor.addHandler(grpcStatsHandler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,22 +183,24 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
ctx.writeAndFlush(response);
}

public void setStats(ServerStatsContext statsContext, ReadResponse obj) {
statsContext.setDatabaseLookupLatency(obj.getDatabaseLookupLatency());
statsContext.setStorageExecutionHandlerSubmissionWaitTime(obj.getStorageExecutionHandlerSubmissionWaitTime());
statsContext.setStorageExecutionQueueLen(obj.getStorageExecutionQueueLen());
statsContext.setSuccessRequestKeyCount(obj.getRecordCount());
statsContext.setMultiChunkLargeValueCount(obj.getMultiChunkLargeValueCount());
statsContext.setReadComputeLatency(obj.getReadComputeLatency());
statsContext.setReadComputeDeserializationLatency(obj.getReadComputeDeserializationLatency());
statsContext.setReadComputeSerializationLatency(obj.getReadComputeSerializationLatency());
statsContext.setDotProductCount(obj.getDotProductCount());
statsContext.setCosineSimilarityCount(obj.getCosineSimilarityCount());
statsContext.setHadamardProductCount(obj.getHadamardProductCount());
statsContext.setCountOperatorCount(obj.getCountOperatorCount());
statsContext.setKeySizeList(obj.getKeySizeList());
statsContext.setValueSizeList(obj.getValueSizeList());
statsContext.setValueSize(obj.getValueSize());
statsContext.setReadComputeOutputSize(obj.getReadComputeOutputSize());
public void setStats(ServerStatsContext statsContext, ReadResponse readResponse) {
statsContext.setDatabaseLookupLatency(readResponse.getDatabaseLookupLatency());
statsContext
.setStorageExecutionHandlerSubmissionWaitTime(readResponse.getStorageExecutionHandlerSubmissionWaitTime());
statsContext.setStorageExecutionQueueLen(readResponse.getStorageExecutionQueueLen());
statsContext.setSuccessRequestKeyCount(readResponse.getRecordCount());
statsContext.setMultiChunkLargeValueCount(readResponse.getMultiChunkLargeValueCount());
statsContext.setReadComputeLatency(readResponse.getReadComputeLatency());
statsContext.setReadComputeDeserializationLatency(readResponse.getReadComputeDeserializationLatency());
statsContext.setReadComputeSerializationLatency(readResponse.getReadComputeSerializationLatency());
statsContext.setDotProductCount(readResponse.getDotProductCount());
statsContext.setCosineSimilarityCount(readResponse.getCosineSimilarityCount());
statsContext.setHadamardProductCount(readResponse.getHadamardProductCount());
statsContext.setCountOperatorCount(readResponse.getCountOperatorCount());
statsContext.setKeySizeList(readResponse.getKeySizeList());
statsContext.setValueSizeList(readResponse.getValueSizeList());
statsContext.setValueSize(readResponse.getValueSize());
statsContext.setReadComputeOutputSize(readResponse.getReadComputeOutputSize());
statsContext.setResponseWriteAndFlushStartTimeNanos(readResponse.getResponseWriteAndFlushStartTimeNanos());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class ServerStatsContext {
private int hadamardProductCount = 0;
private int countOperatorCount = 0;
private boolean isRequestTerminatedEarly = false;
private long responseWriteAndFlushStartTimeNanos = -1;

private IntList keySizeList;
private IntList valueSizeList;
Expand Down Expand Up @@ -157,8 +158,8 @@ public void resetContext() {
isRequestTerminatedEarly = false;
isComplete = false;
isMisroutedStoreVersion = false;

newRequest = false;
responseWriteAndFlushStartTimeNanos = -1;
}

public void setFirstPartLatency(double firstPartLatency) {
Expand Down Expand Up @@ -435,4 +436,12 @@ public void setMisroutedStoreVersion(boolean misroutedStoreVersion) {
public boolean isMisroutedStoreVersion() {
return isMisroutedStoreVersion;
}

public void setResponseWriteAndFlushStartTimeNanos(long startTimeNanos) {
responseWriteAndFlushStartTimeNanos = startTimeNanos;
}

public long getResponseWriteAndFlushStartTimeNanos() {
return responseWriteAndFlushStartTimeNanos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,27 @@
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpResponseStatus;
import it.unimi.dsi.fastutil.ints.IntList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


public class StatsHandler extends ChannelDuplexHandler {
private static final Logger LOGGER = LogManager.getLogger(StatsHandler.class);
private final ServerStatsContext serverStatsContext;
private final AggServerHttpRequestStats singleGetStats;
private final AggServerHttpRequestStats multiGetStats;
private final AggServerHttpRequestStats computeStats;
private final VeniceServerNettyStats nettyStats;

public StatsHandler(
AggServerHttpRequestStats singleGetStats,
AggServerHttpRequestStats multiGetStats,
AggServerHttpRequestStats computeStats) {
AggServerHttpRequestStats computeStats,
VeniceServerNettyStats nettyStats) {
this.singleGetStats = singleGetStats;
this.multiGetStats = multiGetStats;
this.computeStats = computeStats;

this.nettyStats = nettyStats;
this.serverStatsContext = new ServerStatsContext(singleGetStats, multiGetStats, computeStats);
}

Expand Down Expand Up @@ -215,6 +220,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
serverStatsContext.errorRequest(serverHttpRequestStats, elapsedTime);
}
serverStatsContext.setStatCallBackExecuted(true);

long responseWriteAndFlushStartTimeNanosStartTime = serverStatsContext.getResponseWriteAndFlushStartTimeNanos();
if (responseWriteAndFlushStartTimeNanosStartTime > 0) {
nettyStats.recordWriteAndFlushCompletionTimeForDataRequest(responseWriteAndFlushStartTimeNanosStartTime);
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ public void channelRead(ChannelHandlerContext context, Object message) throws Ex
response.setStorageExecutionSubmissionWaitTime(submissionWaitTime);
response.setStorageExecutionQueueLen(queueLen);
response.setRCU(ReadQuotaEnforcementHandler.getRcu(request));
response.recordResponseWriteAndFlushStartTimeNanos();
if (request.isStreamingRequest()) {
response.setStreamingResponse();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class VeniceServerNettyStats extends AbstractVeniceStats {
private final AtomicInteger activeReadHandlerThreads = new AtomicInteger();
private final Sensor writeAndFlushTimeOkRequests;
private final Sensor writeAndFlushTimeBadRequests;
private final Sensor writeAndFlushCompletionTimeForDataRequest;

public VeniceServerNettyStats(MetricsRepository metricsRepository, String name) {
super(metricsRepository, name);
Expand Down Expand Up @@ -49,6 +50,16 @@ public VeniceServerNettyStats(MetricsRepository metricsRepository, String name)
new Avg(),
TehutiUtils
.getPercentileStat(getName() + AbstractVeniceStats.DELIMITER + writeAndFlushTimeBadRequestsSensorName));

String responseWriteAndFlushStartTimeNanosSensorName = "WriteAndFlushCompletionTimeForDataRequest";
writeAndFlushCompletionTimeForDataRequest = registerSensorIfAbsent(
responseWriteAndFlushStartTimeNanosSensorName,
new OccurrenceRate(),
new Max(),
new Min(),
new Avg(),
TehutiUtils.getPercentileStat(
getName() + AbstractVeniceStats.DELIMITER + responseWriteAndFlushStartTimeNanosSensorName));
}

public static long getElapsedTimeInMicros(long startTimeNanos) {
Expand Down Expand Up @@ -76,10 +87,14 @@ public int decrementActiveConnections() {
}

public void recordWriteAndFlushTimeOkRequests(long startTimeNanos) {
writeAndFlushTimeOkRequests.record(getElapsedTimeInNanos(startTimeNanos));
writeAndFlushTimeOkRequests.record(getElapsedTimeInMicros(startTimeNanos));
}

public void recordWriteAndFlushTimeBadRequests(long startTimeNanos) {
writeAndFlushTimeBadRequests.record(getElapsedTimeInNanos(startTimeNanos));
writeAndFlushTimeBadRequests.record(getElapsedTimeInMicros(startTimeNanos));
}

public void recordWriteAndFlushCompletionTimeForDataRequest(long startTimeNanos) {
writeAndFlushCompletionTimeForDataRequest.record(getElapsedTimeInMicros(startTimeNanos));
}
}

0 comments on commit 8d3accf

Please sign in to comment.