Skip to content

Commit

Permalink
Capture request body for HttpUrlConnection (#3724)
Browse files Browse the repository at this point in the history
  • Loading branch information
JonasKunz authored Aug 13, 2024
1 parent 8014890 commit e306c60
Show file tree
Hide file tree
Showing 17 changed files with 533 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,30 @@
import co.elastic.apm.agent.impl.context.SpanContextImpl;
import co.elastic.apm.agent.impl.context.UrlImpl;
import co.elastic.apm.agent.impl.stacktrace.StacktraceConfigurationImpl;
import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.util.ResultUtil;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.apm.agent.tracer.Outcome;
import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.SpanEndListener;
import co.elastic.apm.agent.tracer.pooling.Recyclable;
import co.elastic.apm.agent.tracer.util.ResultUtil;
import co.elastic.apm.agent.util.CharSequenceUtils;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class SpanImpl extends AbstractSpanImpl<SpanImpl> implements Recyclable, Span<SpanImpl> {

/**
* Protection against excessive memory usage and span ending run times:
* We limit the maximum allowed number of end listeners.
*/
static final int MAX_END_LISTENERS = 100;
private static final Logger logger = LoggerFactory.getLogger(SpanImpl.class);
public static final long MAX_LOG_INTERVAL_MICRO_SECS = TimeUnit.MINUTES.toMicros(5);
private static long lastSpanMaxWarningTimestamp;
Expand Down Expand Up @@ -75,6 +84,9 @@ public class SpanImpl extends AbstractSpanImpl<SpanImpl> implements Recyclable,
@Nullable
private List<StackFrame> stackFrames;

private final Set<SpanEndListener<? super SpanImpl>> endListeners =
Collections.newSetFromMap(new ConcurrentHashMap<SpanEndListener<? super SpanImpl>, Boolean>());

/**
* If a span is non-discardable, all the spans leading up to it are non-discardable as well
*/
Expand Down Expand Up @@ -174,6 +186,25 @@ public SpanImpl withAction(@Nullable String action) {
return this;
}

@Override
public void addEndListener(SpanEndListener<? super SpanImpl> listener) {
if (endListeners.size() < MAX_END_LISTENERS) {
endListeners.add(listener);
} else {
if (logger.isDebugEnabled()) {
logger.warn("Not adding span end listener because limit is reached: {}," +
" throwable stacktrace will be added for debugging", listener, new Throwable());
} else {
logger.warn("Not adding span end listener because limit is reached: {}", listener);
}
}
}

@Override
public void removeEndListener(SpanEndListener<? super SpanImpl> listener) {
endListeners.remove(listener);
}


/**
* Sets span.type, span.subtype and span.action. If no subtype and action are provided, assumes the legacy usage of hierarchical
Expand Down Expand Up @@ -221,6 +252,9 @@ public String getAction() {

@Override
public void beforeEnd(long epochMicros) {
for (SpanEndListener<? super SpanImpl> endListener : endListeners) {
endListener.onEnd(this);
}
// set outcome when not explicitly set by user nor instrumentation
if (outcomeNotSet()) {
Outcome outcome;
Expand Down Expand Up @@ -476,6 +510,7 @@ public void resetState() {
super.resetState();
context.resetState();
composite.resetState();
endListeners.clear();
stacktrace = null;
subtype = null;
action = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,25 @@
import co.elastic.apm.agent.impl.sampling.ConstantSampler;
import co.elastic.apm.agent.objectpool.TestObjectPoolFactory;
import co.elastic.apm.agent.tracer.Outcome;
import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.SpanEndListener;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

public class SpanTest {

Expand Down Expand Up @@ -87,6 +93,64 @@ void testOutcomeExplicitlyToUnknown() {
assertThat(span.getOutcome()).isEqualTo(Outcome.UNKNOWN);
}

@Test
void checkEndListenersConcurrencySafe() {
TransactionImpl transaction = new TransactionImpl(tracer);
transaction.startRoot(0, ConstantSampler.of(true), BaggageImpl.EMPTY);
try {
SpanImpl span = new SpanImpl(tracer);
span.start(TraceContextImpl.fromParent(), transaction, BaggageImpl.EMPTY, -1L);

AtomicInteger invocationCounter = new AtomicInteger();
SpanEndListener<Span<?>> callback = new SpanEndListener<Span<?>>() {
@Override
public void onEnd(Span<?> span) {
span.removeEndListener(this);
invocationCounter.incrementAndGet();
}
};
span.addEndListener(callback);
span.end();
assertThat(invocationCounter.get()).isEqualTo(1);
} finally {
transaction.end();
}

}

@Test
@SuppressWarnings("unchecked")
void checkEndListenersLimit() {
TransactionImpl transaction = new TransactionImpl(tracer);
transaction.startRoot(0, ConstantSampler.of(true), BaggageImpl.EMPTY);
try {
SpanImpl span = new SpanImpl(tracer);
span.start(TraceContextImpl.fromParent(), transaction, BaggageImpl.EMPTY, -1L);

for (int i = 0; i < SpanImpl.MAX_END_LISTENERS - 1; i++) {
span.addEndListener(new SpanEndListener<SpanImpl>() {
@Override
public void onEnd(SpanImpl span) {

}
});
}

SpanEndListener<SpanImpl> invokeMe = (SpanEndListener<SpanImpl>) Mockito.mock(SpanEndListener.class);
SpanEndListener<SpanImpl> dontInvokeMe = (SpanEndListener<SpanImpl>) Mockito.mock(SpanEndListener.class);
span.addEndListener(invokeMe);
span.addEndListener(dontInvokeMe);

span.end();

verify(invokeMe).onEnd(span);
verifyNoInteractions(dontInvokeMe);
} finally {
transaction.end();
}

}

@Test
void normalizeEmptyFields() {
SpanImpl span = new SpanImpl(tracer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,17 @@ public static <T> CoderResult decodeUtf8BytesFromSource(ByteSourceReader<T> read
}
}

@Nullable
public static byte[] copyToByteArray(@Nullable ByteBuffer buf) {
if (buf == null) {
return null;
}
byte[] data = new byte[buf.position()];
buf.position(0);
buf.get(data);
return data;
}

public interface ByteSourceReader<S> {
int availableBytes(S source);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public boolean isNestedCallAndDecrement() {
return decrement() != 0;
}

private int get() {
public int get() {
Integer callDepthForCurrentThread = callDepthPerThread.get();
if (callDepthForCurrentThread == null) {
callDepthForCurrentThread = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ protected boolean isBodyCapturingSupported() {
return true;
}

@Override
public void testPostBodyCaptureForExistingSpan() throws Exception {
//TODO: async http client instrumentation does not support capturing bodies for existing spans yet
}

@Override
protected void performPost(String path, byte[] data, String contentTypeHeader) throws Exception {
final CompletableFuture<HttpResponse> responseFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package co.elastic.apm.agent.httpclient;

import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.SpanEndListener;
import co.elastic.apm.agent.tracer.metadata.BodyCapture;

class RequestBodyRecordingHelper implements SpanEndListener<Span<?>> {

/**
* We do not need to participate in span reference counting here.
* Instead, we only hold a reference to the span for the time it is not ended.
* This is ensured via the {@link #onEnd(Span)} callback.
*/
// Visible for testing
Span<?> clientSpan;

public RequestBodyRecordingHelper(Span<?> clientSpan) {
if (!clientSpan.isFinished()) {
this.clientSpan = clientSpan;
clientSpan.addEndListener(this);
}
}

void appendToBody(byte b) {
if (clientSpan != null) {
BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody();
requestBody.append(b);
if (requestBody.isFull()) {
releaseSpan();
}
}
}

void appendToBody(byte[] b, int off, int len) {
if (clientSpan != null) {
BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody();
requestBody.append(b, off, len);
if (requestBody.isFull()) {
releaseSpan();
}
}
}

void releaseSpan() {
if (clientSpan != null) {
clientSpan.removeEndListener(this);
}
clientSpan = null;
}

@Override
public void onEnd(Span<?> span) {
releaseSpan();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,60 +19,29 @@
package co.elastic.apm.agent.httpclient;

import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.metadata.BodyCapture;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;

public class RequestBodyRecordingInputStream extends InputStream {

private final InputStream delegate;

@Nullable
private Span<?> clientSpan;
private final RequestBodyRecordingHelper recordingHelper;

public RequestBodyRecordingInputStream(InputStream delegate, Span<?> clientSpan) {
this.delegate = delegate;
clientSpan.incrementReferences();
this.clientSpan = clientSpan;
this.recordingHelper = new RequestBodyRecordingHelper(clientSpan);
}


protected void appendToBody(byte b) {
if (clientSpan != null) {
BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody();
requestBody.append(b);
if (requestBody.isFull()) {
releaseSpan();
}
}
}

protected void appendToBody(byte[] b, int off, int len) {
if (clientSpan != null) {
BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody();
requestBody.append(b, off, len);
if (requestBody.isFull()) {
releaseSpan();
}
}
}

public void releaseSpan() {
if (clientSpan != null) {
clientSpan.decrementReferences();
clientSpan = null;
}
}

@Override
public int read() throws IOException {
int character = delegate.read();
if (character == -1) {
releaseSpan();
recordingHelper.releaseSpan();
} else {
appendToBody((byte) character);
recordingHelper.appendToBody((byte) character);
}
return character;
}
Expand All @@ -81,9 +50,9 @@ public int read() throws IOException {
public int read(byte[] b, int off, int len) throws IOException {
int readBytes = delegate.read(b, off, len);
if (readBytes == -1) {
releaseSpan();
recordingHelper.releaseSpan();
} else {
appendToBody(b, off, readBytes);
recordingHelper.appendToBody(b, off, readBytes);
}
return readBytes;
}
Expand All @@ -96,7 +65,7 @@ public int available() throws IOException {
@Override
public void close() throws IOException {
try {
releaseSpan();
recordingHelper.releaseSpan();
} finally {
delegate.close();
}
Expand Down
Loading

0 comments on commit e306c60

Please sign in to comment.