Skip to content

Commit

Permalink
Improve flush (#2228)
Browse files Browse the repository at this point in the history
  • Loading branch information
felixbarny authored Nov 8, 2021
1 parent 58ae228 commit 96c5ec6
Show file tree
Hide file tree
Showing 12 changed files with 297 additions and 278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void setUp(Blackhole blackhole) throws IOException {
@TearDown
public void tearDown() throws ExecutionException, InterruptedException {
Thread.sleep(1000);
tracer.getReporter().flush().get();
tracer.getReporter().flush();
server.stop();
System.out.println("Reported: " + tracer.getReporter().getReported());
System.out.println("Dropped: " + tracer.getReporter().getDropped());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class AbstractIntakeApiHandler {
protected OutputStream os;
protected int errorCount;
protected volatile boolean shutDown;
private volatile boolean healthy = true;
private long requestStartedNanos;

public AbstractIntakeApiHandler(ReporterConfiguration reporterConfiguration, PayloadSerializer payloadSerializer, ApmServerClient apmServerClient) {
this.reporterConfiguration = reporterConfiguration;
Expand Down Expand Up @@ -106,10 +108,11 @@ protected HttpURLConnection startRequest(String endpoint) throws Exception {
connection.setRequestProperty("Content-Type", "application/x-ndjson");
connection.setUseCaches(false);
connection.connect();
os = new DeflaterOutputStream(connection.getOutputStream(), deflater);
os = new DeflaterOutputStream(connection.getOutputStream(), deflater, true);
payloadSerializer.setOutputStream(os);
payloadSerializer.appendMetaDataNdJsonToStream();
payloadSerializer.flushToOutputStream();
requestStartedNanos = System.nanoTime();
} catch (IOException e) {
logger.error("Error trying to connect to APM Server at {}. Some details about SSL configurations corresponding " +
"the current connection are logged at INFO level.", connection.getURL());
Expand Down Expand Up @@ -184,6 +187,10 @@ public void endRequest() {
}
}

protected boolean isApiRequestTimeExpired() {
return System.nanoTime() >= requestStartedNanos + TimeUnit.MILLISECONDS.toNanos(reporterConfiguration.getApiRequestTime().getMillis());
}

protected void onRequestError(Integer responseCode, InputStream inputStream, @Nullable IOException e) {
// TODO read accepted, dropped and invalid
onConnectionError(responseCode, currentlyTransmitting, 0);
Expand Down Expand Up @@ -212,21 +219,32 @@ protected void onConnectionError(@Nullable Integer responseCode, long droppedEve
"Please use APM Server 6.5.0 or newer.");
}

backoff();
}

private void backoff() {
long backoffTimeSeconds = getBackoffTimeSeconds(errorCount++);
logger.info("Backing off for {} seconds (+/-10%)", backoffTimeSeconds);
final long backoffTimeMillis = TimeUnit.SECONDS.toMillis(backoffTimeSeconds);
if (backoffTimeMillis > 0) {
// back off because there are connection issues with the apm server
try {
healthy = false;
synchronized (WAIT_LOCK) {
WAIT_LOCK.wait(backoffTimeMillis + getRandomJitter(backoffTimeMillis));
}
} catch (InterruptedException e) {
logger.info("APM Agent ReportingEventHandler had been interrupted", e);
} finally {
healthy = true;
}
}
}

public boolean isHealthy() {
return healthy;
}

public long getReported() {
return reported;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Future;
import javax.annotation.Nullable;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

/**
* This reporter asynchronously reports {@link Transaction}s to the APM server
Expand All @@ -62,10 +63,17 @@ public void translateTo(ReportingEvent event, long sequence, Span s) {
event.setSpan(s);
}
};
private static final EventTranslator<ReportingEvent> FLUSH_EVENT_TRANSLATOR = new EventTranslator<ReportingEvent>() {
private static final EventTranslatorOneArg<ReportingEvent, Thread> END_REQUEST_EVENT_TRANSLATOR = new EventTranslatorOneArg<ReportingEvent, Thread>() {
@Override
public void translateTo(ReportingEvent event, long sequence, @Nullable Thread unparkAfterProcessed) {
event.setEndRequestEvent();
event.unparkAfterProcessed(unparkAfterProcessed);
}
};
private static final EventTranslator<ReportingEvent> WAKEUP_EVENT_TRANSLATOR = new EventTranslator<ReportingEvent>() {
@Override
public void translateTo(ReportingEvent event, long sequence) {
event.setFlushEvent();
event.setWakeupEvent();
}
};
private static final EventTranslatorOneArg<ReportingEvent, ErrorCapture> ERROR_EVENT_TRANSLATOR = new EventTranslatorOneArg<ReportingEvent, ErrorCapture>() {
Expand All @@ -80,10 +88,11 @@ public void translateTo(ReportingEvent event, long sequence, JsonWriter jsonWrit
event.setJsonWriter(jsonWriter);
}
};
private static final EventTranslator<ReportingEvent> SHUTDOWN_EVENT_TRANSLATOR = new EventTranslator<ReportingEvent>() {
private static final EventTranslatorOneArg<ReportingEvent, Thread> SHUTDOWN_EVENT_TRANSLATOR = new EventTranslatorOneArg<ReportingEvent, Thread>() {
@Override
public void translateTo(ReportingEvent event, long sequence) {
public void translateTo(ReportingEvent event, long sequence, @Nullable Thread unparkAfterProcessed) {
event.shutdownEvent();
event.unparkAfterProcessed(Thread.currentThread());
}
};

Expand Down Expand Up @@ -123,7 +132,7 @@ public void report(Transaction transaction) {
transaction.decrementReferences();
}
if (syncReport) {
waitForFlush();
flush();
}
}

Expand All @@ -133,16 +142,13 @@ public void report(Span span) {
span.decrementReferences();
}
if (syncReport) {
waitForFlush();
flush();
}
}

private void waitForFlush() {
try {
flush().get();
} catch (Exception e) {
throw new RuntimeException(e);
}
@Override
public boolean flush() {
return flush(-1, TimeUnit.NANOSECONDS);
}

@Override
Expand All @@ -155,83 +161,74 @@ public long getReported() {
return reportingEventHandler.getReported();
}

/**
* Flushes pending {@link ErrorCapture}s and {@link Transaction}s to the APM server.
* <p>
* This method may block for a while until a slot in the ring buffer becomes available.
* </p>
*
* @return A {@link Future} which resolves when the flush has been executed.
*/
@Override
public Future<Void> flush() {
final boolean success = disruptor.getRingBuffer().tryPublishEvent(FLUSH_EVENT_TRANSLATOR);
if (!success) {
throw new IllegalStateException("Ring buffer has no available slots");
}
final long cursor = disruptor.getCursor();
return new Future<Void>() {
private volatile boolean cancelled = false;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (isDone()) {
return false;
}
disruptor.get(cursor).resetState();
// the volatile write also ensures visibility of the resetState() in other threads
cancelled = true;
return true;
}

@Override
public boolean isCancelled() {
return cancelled;
}

@Override
public boolean isDone() {
return isEventProcessed(cursor);
}
public void scheduleWakeupEvent() {
disruptor.getRingBuffer().tryPublishEvent(WAKEUP_EVENT_TRANSLATOR);
}

@Override
public Void get() throws InterruptedException {
while (!isDone()) {
Thread.sleep(1);
}
return null;
}
@Override
public boolean flush(long timeout, TimeUnit unit) {
return publishAndWaitForEvent(timeout, unit, END_REQUEST_EVENT_TRANSLATOR);
}

/*
* This might not a very elegant or efficient implementation but it is only intended to be used in tests anyway
*/
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
for (; timeout > 0 && !isDone(); timeout--) {
Thread.sleep(1);
private boolean publishAndWaitForEvent(long timeout, TimeUnit unit, EventTranslatorOneArg<ReportingEvent, Thread> eventTranslator) {
if (!reportingEventHandler.isHealthy()) {
return false;
}
ReportingEventHandler reportingEventHandler = this.reportingEventHandler;
long startNs = System.nanoTime();
long thresholdNs;
if (timeout < 0) {
thresholdNs = Long.MAX_VALUE;
} else {
thresholdNs = unit.toNanos(timeout) + startNs;
}
do {
try {
long sequence = disruptor.getRingBuffer().tryNext();
try {
eventTranslator.translateTo(disruptor.get(sequence), sequence, Thread.currentThread());
} finally {
disruptor.getRingBuffer().publish(sequence);
}
if (!isDone()) {
throw new TimeoutException();
return waitForEventProcessed(sequence, thresholdNs);
} catch (InsufficientCapacityException e) {
LockSupport.parkNanos(100_000);
if (Thread.currentThread().isInterrupted()) {
break;
}
return null;
}
};
} while (System.nanoTime() < thresholdNs && reportingEventHandler.isHealthy());
return false;
}

private boolean isEventProcessed(long sequence) {
return disruptor.getSequenceValueFor(reportingEventHandler) >= sequence;
private boolean waitForEventProcessed(long sequence, long thresholdNs) {
ReportingEventHandler reportingEventHandler = this.reportingEventHandler;
for (long nowNs = System.nanoTime();
nowNs < thresholdNs && reportingEventHandler.isHealthy() && !reportingEventHandler.isProcessed(sequence);
nowNs = System.nanoTime()) {

// periodically waking up to check if the connection turned unhealthy
// after the event has been published to the ring buffer
int minPeriodicWakeupNs = 10_000_000;
// after the event has been processed, this thread will be unparked and we'll return immediately
LockSupport.parkNanos(Math.min(minPeriodicWakeupNs, thresholdNs - nowNs));
if (Thread.currentThread().isInterrupted()) {
break;
}
}
return reportingEventHandler.isProcessed(sequence);
}

@Override
public void close() {
logger.info("dropped events because of full queue: {}", dropped.get());
disruptor.getRingBuffer().tryPublishEvent(SHUTDOWN_EVENT_TRANSLATOR);
publishAndWaitForEvent(5, TimeUnit.SECONDS, SHUTDOWN_EVENT_TRANSLATOR);
reportingEventHandler.close();
try {
disruptor.shutdown(5, TimeUnit.SECONDS);
disruptor.shutdown(1, TimeUnit.SECONDS);
} catch (com.lmax.disruptor.TimeoutException e) {
logger.warn("Timeout while shutting down disruptor");
}
reportingEventHandler.close();
}

@Override
Expand All @@ -240,7 +237,7 @@ public void report(ErrorCapture error) {
error.recycle();
}
if (syncReport) {
waitForFlush();
flush();
}
}

Expand All @@ -251,7 +248,7 @@ public void report(JsonWriter jsonWriter) {
}
tryAddEventToRingBuffer(jsonWriter, JSON_WRITER_EVENT_TRANSLATOR);
if (syncReport) {
waitForFlush();
flush();
}
}

Expand Down
Loading

0 comments on commit 96c5ec6

Please sign in to comment.