Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into fix-redis-test-text…
Browse files Browse the repository at this point in the history
…-file-busy
  • Loading branch information
felixbarny committed Nov 8, 2021
2 parents 1d4c5a5 + 96c5ec6 commit 19dabd5
Show file tree
Hide file tree
Showing 38 changed files with 918 additions and 335 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ by the agent may be different. This was done in order to improve the integration
* Update to async-profiler 1.8.7 and set configured `safemode` at load time though a new system property - {pull}2165[#2165]
* Added support to capture `context.message.routing-key` in rabbitmq, spring amqp instrumentations - {pull}1767[#1767]
* Breakdown metrics are now tracked per service (when using APM Server 8.0) - {pull}2208[#2208]
* Add support for Spring AMQP batch API - {pull}1716[#1716]
[float]
===== Performance improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void setUp(Blackhole blackhole) throws IOException {
@TearDown
public void tearDown() throws ExecutionException, InterruptedException {
Thread.sleep(1000);
tracer.getReporter().flush().get();
tracer.getReporter().flush();
server.stop();
System.out.println("Reported: " + tracer.getReporter().getReported());
System.out.println("Dropped: " + tracer.getReporter().getDropped());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ public String getPluginsDir() {
}
}

public long geMetadataDiscoveryTimeoutMs() {
public long getMetadataDiscoveryTimeoutMs() {
return metadataTimeoutMs.get().getMillis();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static Future<MetaData> create(ConfigurationRegistry configurationRegistr
}

final ThreadPoolExecutor executor = ExecutorUtils.createThreadDaemonPool("metadata", 2, 3);
final int metadataDiscoveryTimeoutMs = (int) coreConfiguration.geMetadataDiscoveryTimeoutMs();
final int metadataDiscoveryTimeoutMs = (int) coreConfiguration.getMetadataDiscoveryTimeoutMs();

try {
// System info creation executes external processes for hostname discovery and reads files for container/k8s metadata discovery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class AbstractIntakeApiHandler {
protected OutputStream os;
protected int errorCount;
protected volatile boolean shutDown;
private volatile boolean healthy = true;
private long requestStartedNanos;

public AbstractIntakeApiHandler(ReporterConfiguration reporterConfiguration, PayloadSerializer payloadSerializer, ApmServerClient apmServerClient) {
this.reporterConfiguration = reporterConfiguration;
Expand Down Expand Up @@ -106,10 +108,11 @@ protected HttpURLConnection startRequest(String endpoint) throws Exception {
connection.setRequestProperty("Content-Type", "application/x-ndjson");
connection.setUseCaches(false);
connection.connect();
os = new DeflaterOutputStream(connection.getOutputStream(), deflater);
os = new DeflaterOutputStream(connection.getOutputStream(), deflater, true);
payloadSerializer.setOutputStream(os);
payloadSerializer.appendMetaDataNdJsonToStream();
payloadSerializer.flushToOutputStream();
requestStartedNanos = System.nanoTime();
} catch (IOException e) {
logger.error("Error trying to connect to APM Server at {}. Some details about SSL configurations corresponding " +
"the current connection are logged at INFO level.", connection.getURL());
Expand Down Expand Up @@ -184,6 +187,10 @@ public void endRequest() {
}
}

protected boolean isApiRequestTimeExpired() {
return System.nanoTime() >= requestStartedNanos + TimeUnit.MILLISECONDS.toNanos(reporterConfiguration.getApiRequestTime().getMillis());
}

protected void onRequestError(Integer responseCode, InputStream inputStream, @Nullable IOException e) {
// TODO read accepted, dropped and invalid
onConnectionError(responseCode, currentlyTransmitting, 0);
Expand Down Expand Up @@ -212,21 +219,32 @@ protected void onConnectionError(@Nullable Integer responseCode, long droppedEve
"Please use APM Server 6.5.0 or newer.");
}

backoff();
}

private void backoff() {
long backoffTimeSeconds = getBackoffTimeSeconds(errorCount++);
logger.info("Backing off for {} seconds (+/-10%)", backoffTimeSeconds);
final long backoffTimeMillis = TimeUnit.SECONDS.toMillis(backoffTimeSeconds);
if (backoffTimeMillis > 0) {
// back off because there are connection issues with the apm server
try {
healthy = false;
synchronized (WAIT_LOCK) {
WAIT_LOCK.wait(backoffTimeMillis + getRandomJitter(backoffTimeMillis));
}
} catch (InterruptedException e) {
logger.info("APM Agent ReportingEventHandler had been interrupted", e);
} finally {
healthy = true;
}
}
}

public boolean isHealthy() {
return healthy;
}

public long getReported() {
return reported;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Future;
import javax.annotation.Nullable;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

/**
* This reporter asynchronously reports {@link Transaction}s to the APM server
Expand All @@ -62,10 +63,17 @@ public void translateTo(ReportingEvent event, long sequence, Span s) {
event.setSpan(s);
}
};
private static final EventTranslator<ReportingEvent> FLUSH_EVENT_TRANSLATOR = new EventTranslator<ReportingEvent>() {
private static final EventTranslatorOneArg<ReportingEvent, Thread> END_REQUEST_EVENT_TRANSLATOR = new EventTranslatorOneArg<ReportingEvent, Thread>() {
@Override
public void translateTo(ReportingEvent event, long sequence, @Nullable Thread unparkAfterProcessed) {
event.setEndRequestEvent();
event.unparkAfterProcessed(unparkAfterProcessed);
}
};
private static final EventTranslator<ReportingEvent> WAKEUP_EVENT_TRANSLATOR = new EventTranslator<ReportingEvent>() {
@Override
public void translateTo(ReportingEvent event, long sequence) {
event.setFlushEvent();
event.setWakeupEvent();
}
};
private static final EventTranslatorOneArg<ReportingEvent, ErrorCapture> ERROR_EVENT_TRANSLATOR = new EventTranslatorOneArg<ReportingEvent, ErrorCapture>() {
Expand All @@ -80,10 +88,11 @@ public void translateTo(ReportingEvent event, long sequence, JsonWriter jsonWrit
event.setJsonWriter(jsonWriter);
}
};
private static final EventTranslator<ReportingEvent> SHUTDOWN_EVENT_TRANSLATOR = new EventTranslator<ReportingEvent>() {
private static final EventTranslatorOneArg<ReportingEvent, Thread> SHUTDOWN_EVENT_TRANSLATOR = new EventTranslatorOneArg<ReportingEvent, Thread>() {
@Override
public void translateTo(ReportingEvent event, long sequence) {
public void translateTo(ReportingEvent event, long sequence, @Nullable Thread unparkAfterProcessed) {
event.shutdownEvent();
event.unparkAfterProcessed(Thread.currentThread());
}
};

Expand Down Expand Up @@ -123,7 +132,7 @@ public void report(Transaction transaction) {
transaction.decrementReferences();
}
if (syncReport) {
waitForFlush();
flush();
}
}

Expand All @@ -133,16 +142,13 @@ public void report(Span span) {
span.decrementReferences();
}
if (syncReport) {
waitForFlush();
flush();
}
}

private void waitForFlush() {
try {
flush().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
@Override
public boolean flush() {
return flush(-1, TimeUnit.NANOSECONDS);
}

@Override
Expand All @@ -155,83 +161,74 @@ public long getReported() {
return reportingEventHandler.getReported();
}

/**
* Flushes pending {@link ErrorCapture}s and {@link Transaction}s to the APM server.
* <p>
* This method may block for a while until a slot in the ring buffer becomes available.
* </p>
*
* @return A {@link Future} which resolves when the flush has been executed.
*/
@Override
public Future<Void> flush() {
final boolean success = disruptor.getRingBuffer().tryPublishEvent(FLUSH_EVENT_TRANSLATOR);
if (!success) {
throw new IllegalStateException("Ring buffer has no available slots");
}
final long cursor = disruptor.getCursor();
return new Future<Void>() {
private volatile boolean cancelled = false;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (isDone()) {
return false;
}
disruptor.get(cursor).resetState();
// the volatile write also ensures visibility of the resetState() in other threads
cancelled = true;
return true;
}

@Override
public boolean isCancelled() {
return cancelled;
}

@Override
public boolean isDone() {
return isEventProcessed(cursor);
}
public void scheduleWakeupEvent() {
disruptor.getRingBuffer().tryPublishEvent(WAKEUP_EVENT_TRANSLATOR);
}

@Override
public Void get() throws InterruptedException {
while (!isDone()) {
Thread.sleep(1);
}
return null;
}
@Override
public boolean flush(long timeout, TimeUnit unit) {
return publishAndWaitForEvent(timeout, unit, END_REQUEST_EVENT_TRANSLATOR);
}

/*
* This might not a very elegant or efficient implementation but it is only intended to be used in tests anyway
*/
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
for (; timeout > 0 && !isDone(); timeout--) {
Thread.sleep(1);
private boolean publishAndWaitForEvent(long timeout, TimeUnit unit, EventTranslatorOneArg<ReportingEvent, Thread> eventTranslator) {
if (!reportingEventHandler.isHealthy()) {
return false;
}
ReportingEventHandler reportingEventHandler = this.reportingEventHandler;
long startNs = System.nanoTime();
long thresholdNs;
if (timeout < 0) {
thresholdNs = Long.MAX_VALUE;
} else {
thresholdNs = unit.toNanos(timeout) + startNs;
}
do {
try {
long sequence = disruptor.getRingBuffer().tryNext();
try {
eventTranslator.translateTo(disruptor.get(sequence), sequence, Thread.currentThread());
} finally {
disruptor.getRingBuffer().publish(sequence);
}
if (!isDone()) {
throw new TimeoutException();
return waitForEventProcessed(sequence, thresholdNs);
} catch (InsufficientCapacityException e) {
LockSupport.parkNanos(100_000);
if (Thread.currentThread().isInterrupted()) {
break;
}
return null;
}
};
} while (System.nanoTime() < thresholdNs && reportingEventHandler.isHealthy());
return false;
}

private boolean isEventProcessed(long sequence) {
return disruptor.getSequenceValueFor(reportingEventHandler) >= sequence;
private boolean waitForEventProcessed(long sequence, long thresholdNs) {
ReportingEventHandler reportingEventHandler = this.reportingEventHandler;
for (long nowNs = System.nanoTime();
nowNs < thresholdNs && reportingEventHandler.isHealthy() && !reportingEventHandler.isProcessed(sequence);
nowNs = System.nanoTime()) {

// periodically waking up to check if the connection turned unhealthy
// after the event has been published to the ring buffer
int minPeriodicWakeupNs = 10_000_000;
// after the event has been processed, this thread will be unparked and we'll return immediately
LockSupport.parkNanos(Math.min(minPeriodicWakeupNs, thresholdNs - nowNs));
if (Thread.currentThread().isInterrupted()) {
break;
}
}
return reportingEventHandler.isProcessed(sequence);
}

@Override
public void close() {
logger.info("dropped events because of full queue: {}", dropped.get());
disruptor.getRingBuffer().tryPublishEvent(SHUTDOWN_EVENT_TRANSLATOR);
publishAndWaitForEvent(5, TimeUnit.SECONDS, SHUTDOWN_EVENT_TRANSLATOR);
reportingEventHandler.close();
try {
disruptor.shutdown(5, TimeUnit.SECONDS);
disruptor.shutdown(1, TimeUnit.SECONDS);
} catch (com.lmax.disruptor.TimeoutException e) {
logger.warn("Timeout while shutting down disruptor");
}
reportingEventHandler.close();
}

@Override
Expand All @@ -240,7 +237,7 @@ public void report(ErrorCapture error) {
error.recycle();
}
if (syncReport) {
waitForFlush();
flush();
}
}

Expand All @@ -251,7 +248,7 @@ public void report(JsonWriter jsonWriter) {
}
tryAddEventToRingBuffer(jsonWriter, JSON_WRITER_EVENT_TRANSLATOR);
if (syncReport) {
waitForFlush();
flush();
}
}

Expand Down
Loading

0 comments on commit 19dabd5

Please sign in to comment.