Skip to content

Commit

Permalink
Merge #3211 into 1.2.0-M2
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Apr 30, 2024
2 parents 2aac369 + a2b1828 commit 1c65384
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

import java.net.SocketAddress;
import java.time.Duration;
Expand All @@ -54,12 +56,21 @@ abstract class AbstractHttpServerMetricsHandler extends ChannelDuplexHandler {
boolean channelActivated;
boolean channelOpened;

ContextView contextView;

long dataReceived;
long dataReceivedTime;

long dataSent;
long dataSentTime;

boolean initialized;

String method;
String path;
SocketAddress remoteSocketAddress;
String status;

final Function<String, String> methodTagValue;
final Function<String, String> uriTagValue;

Expand All @@ -73,10 +84,16 @@ protected AbstractHttpServerMetricsHandler(
protected AbstractHttpServerMetricsHandler(AbstractHttpServerMetricsHandler copy) {
this.channelActivated = copy.channelActivated;
this.channelOpened = copy.channelOpened;
this.contextView = copy.contextView;
this.dataReceived = copy.dataReceived;
this.dataReceivedTime = copy.dataReceivedTime;
this.dataSent = copy.dataSent;
this.dataSentTime = copy.dataSentTime;
this.initialized = copy.initialized;
this.method = copy.method;
this.path = copy.path;
this.remoteSocketAddress = copy.remoteSocketAddress;
this.status = copy.status;
this.methodTagValue = copy.methodTagValue;
this.uriTagValue = copy.uriTagValue;
}
Expand Down Expand Up @@ -144,8 +161,19 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations) {
HttpServerOperations ops = (HttpServerOperations) channelOps;
startWrite(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path),
methodTagValue.apply(ops.method().name()), ops.status().codeAsText().toString());
if (!initialized) {
method = methodTagValue.apply(ops.method().name());
path = uriTagValue == null ? ops.path : uriTagValue.apply(ops.path);
// Always take the remote address from the operations in order to consider proxy information
// Use remoteSocketAddress() in order to obtain UDS info
remoteSocketAddress = ops.remoteSocketAddress();
initialized = true;
}
if (contextView == null) {
contextView(ops);
}
status = ops.status().codeAsText().toString();
startWrite(ops);
}
}

Expand All @@ -157,8 +185,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
if (channelOps instanceof HttpServerOperations) {
HttpServerOperations ops = (HttpServerOperations) channelOps;
try {
recordWrite(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path),
methodTagValue.apply(ops.method().name()), ops.status().codeAsText().toString());
recordWrite(ops);
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
Expand All @@ -169,8 +196,6 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)

recordInactiveConnectionOrStream(ctx.channel(), ops);
}

dataSent = 0;
});
}
}
Expand All @@ -188,12 +213,20 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
HttpServerOperations ops = null;
try {
if (msg instanceof HttpRequest) {
reset();
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations) {
HttpServerOperations ops = (HttpServerOperations) channelOps;
startRead(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path), methodTagValue.apply(ops.method().name()));
ops = (HttpServerOperations) channelOps;
method = methodTagValue.apply(ops.method().name());
path = uriTagValue == null ? ops.path : uriTagValue.apply(ops.path);
// Always take the remote address from the operations in order to consider proxy information
// Use remoteSocketAddress() in order to obtain UDS info
remoteSocketAddress = ops.remoteSocketAddress();
initialized = true;
startRead(ops);

channelActivated = true;
if (ctx.channel() instanceof Http2StreamChannel) {
Expand All @@ -214,13 +247,7 @@ else if (ctx.channel() instanceof SocketChannel) {
dataReceived += extractProcessedDataFromBuffer(msg);

if (msg instanceof LastHttpContent) {
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations) {
HttpServerOperations ops = (HttpServerOperations) channelOps;
recordRead(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path), methodTagValue.apply(ops.method().name()));
}

dataReceived = 0;
recordRead();
}
}
catch (RuntimeException e) {
Expand All @@ -231,17 +258,17 @@ else if (ctx.channel() instanceof SocketChannel) {
}

ctx.fireChannelRead(msg);

if (ops != null) {
// ContextView is available only when a subscription to the I/O Handler happens
contextView(ops);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
try {
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations) {
HttpServerOperations ops = (HttpServerOperations) channelOps;
// Always take the remote address from the operations in order to consider proxy information
recordException(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path));
}
recordException();
}
catch (RuntimeException e) {
// Allow request-response exchange to continue, unaffected by metrics problem
Expand All @@ -265,21 +292,25 @@ else if (msg instanceof ByteBuf) {

protected abstract HttpServerMetricsRecorder recorder();

protected void recordException(HttpServerOperations ops, String path) {
protected void contextView(HttpServerOperations ops) {
this.contextView = Context.empty();
}

protected void recordException() {
// Always take the remote address from the operations in order to consider proxy information
// Use remoteSocketAddress() in order to obtain UDS info
recorder().incrementErrorsCount(ops.remoteSocketAddress(), path);
recorder().incrementErrorsCount(remoteSocketAddress, path);
}

protected void recordRead(HttpServerOperations ops, String path, String method) {
protected void recordRead() {
recorder().recordDataReceivedTime(path, method, Duration.ofNanos(System.nanoTime() - dataReceivedTime));

// Always take the remote address from the operations in order to consider proxy information
// Use remoteSocketAddress() in order to obtain UDS info
recorder().recordDataReceived(ops.remoteSocketAddress(), path, dataReceived);
recorder().recordDataReceived(remoteSocketAddress, path, dataReceived);
}

protected void recordWrite(HttpServerOperations ops, String path, String method, String status) {
protected void recordWrite(HttpServerOperations ops) {
Duration dataSentTimeDuration = Duration.ofNanos(System.nanoTime() - dataSentTime);
recorder().recordDataSentTime(path, method, status, dataSentTimeDuration);

Expand All @@ -292,7 +323,7 @@ protected void recordWrite(HttpServerOperations ops, String path, String method,

// Always take the remote address from the operations in order to consider proxy information
// Use remoteSocketAddress() in order to obtain UDS info
recorder().recordDataSent(ops.remoteSocketAddress(), path, dataSent);
recorder().recordDataSent(remoteSocketAddress, path, dataSent);
}

protected void recordActiveConnection(SocketAddress localAddress) {
Expand All @@ -311,11 +342,11 @@ protected void recordClosedStream(SocketAddress localAddress) {
recorder().recordStreamClosed(localAddress);
}

protected void startRead(HttpServerOperations ops, String path, String method) {
protected void startRead(HttpServerOperations ops) {
dataReceivedTime = System.nanoTime();
}

protected void startWrite(HttpServerOperations ops, String path, String method, String status) {
protected void startWrite(HttpServerOperations ops) {
dataSentTime = System.nanoTime();
}

Expand Down Expand Up @@ -346,6 +377,20 @@ else if (channel instanceof SocketChannel) {
}
}

void reset() {
// There is no need to reset 'channelActivated' and 'channelOpened'
contextView = null;
dataReceived = 0;
dataReceivedTime = 0;
dataSent = 0;
dataSentTime = 0;
initialized = false;
method = null;
path = null;
remoteSocketAddress = null;
status = null;
}

static final Set<String> STANDARD_METHODS;
static {
Set<String> standardMethods = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,29 @@ protected ContextAwareHttpServerMetricsRecorder recorder() {
}

@Override
protected void recordException(HttpServerOperations ops, String path) {
protected void contextView(HttpServerOperations ops) {
this.contextView = ops.currentContext();
}

@Override
protected void recordException() {
// Always take the remote address from the operations in order to consider proxy information
// Use remoteSocketAddress() in order to obtain UDS info
recorder().incrementErrorsCount(ops.currentContext(), ops.remoteSocketAddress(), path);
recorder().incrementErrorsCount(contextView, remoteSocketAddress, path);
}

@Override
protected void recordRead(HttpServerOperations ops, String path, String method) {
ContextView contextView = ops.currentContext();
protected void recordRead() {
recorder().recordDataReceivedTime(contextView, path, method,
Duration.ofNanos(System.nanoTime() - dataReceivedTime));

// Always take the remote address from the operations in order to consider proxy information
// Use remoteSocketAddress() in order to obtain UDS info
recorder().recordDataReceived(contextView, ops.remoteSocketAddress(), path, dataReceived);
recorder().recordDataReceived(contextView, remoteSocketAddress, path, dataReceived);
}

@Override
protected void recordWrite(HttpServerOperations ops, String path, String method, String status) {
ContextView contextView = ops.currentContext();
protected void recordWrite(HttpServerOperations ops) {
Duration dataSentTimeDuration = Duration.ofNanos(System.nanoTime() - dataSentTime);
recorder().recordDataSentTime(contextView, path, method, status, dataSentTimeDuration);

Expand All @@ -83,6 +86,6 @@ protected void recordWrite(HttpServerOperations ops, String path, String method,

// Always take the remote address from the operations in order to consider proxy information
// Use remoteSocketAddress() in order to obtain UDS info
recorder().recordDataSent(contextView, ops.remoteSocketAddress(), path, dataSent);
recorder().recordDataSent(contextView, remoteSocketAddress, path, dataSent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ protected HttpServerMetricsRecorder recorder() {
}

@Override
protected void recordWrite(HttpServerOperations ops, String path, String method, String status) {
protected void recordWrite(HttpServerOperations ops) {
Duration dataSentTimeDuration = Duration.ofNanos(System.nanoTime() - dataSentTime);
recorder().recordDataSentTime(path, method, status, dataSentTimeDuration);

// Always take the remote address from the operations in order to consider proxy information
// Use remoteSocketAddress() in order to obtain UDS info
recorder().recordDataSent(ops.remoteSocketAddress(), path, dataSent);
recorder().recordDataSent(remoteSocketAddress, path, dataSent);

// Cannot invoke the recorder anymore:
// 1. The recorder is one instance only, it is invoked for all requests that can happen
Expand All @@ -108,8 +108,8 @@ protected void recordWrite(HttpServerOperations ops, String path, String method,
}

@Override
protected void startRead(HttpServerOperations ops, String path, String method) {
super.startRead(ops, path, method);
protected void startRead(HttpServerOperations ops) {
super.startRead(ops);

responseTimeHandlerContext = new ResponseTimeHandlerContext(recorder, method, path, ops);
responseTimeObservation = Observation.createNotStarted(this.responseTimeName, responseTimeHandlerContext, OBSERVATION_REGISTRY);
Expand All @@ -119,8 +119,8 @@ protected void startRead(HttpServerOperations ops, String path, String method) {

// response
@Override
protected void startWrite(HttpServerOperations ops, String path, String method, String status) {
super.startWrite(ops, path, method, status);
protected void startWrite(HttpServerOperations ops) {
super.startWrite(ops);

if (responseTimeObservation == null) {
responseTimeHandlerContext = new ResponseTimeHandlerContext(recorder, method, path, ops);
Expand Down

0 comments on commit 1c65384

Please sign in to comment.