Skip to content

Commit

Permalink
Flush reporter on shutdown (#397)
Browse files Browse the repository at this point in the history
  • Loading branch information
felixbarny authored Jan 7, 2019
1 parent 5ce62af commit 64a6c21
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.concurrent.Future;
Expand All @@ -49,6 +51,8 @@
*/
public class ApmServerReporter implements Reporter {

private static final Logger logger = LoggerFactory.getLogger(ApmServerReporter.class);

private static final EventTranslatorOneArg<ReportingEvent, Transaction> TRANSACTION_EVENT_TRANSLATOR = new EventTranslatorOneArg<ReportingEvent, Transaction>() {
@Override
public void translateTo(ReportingEvent event, long sequence, Transaction t) {
Expand All @@ -73,6 +77,12 @@ public void translateTo(ReportingEvent event, long sequence, ErrorCapture error)
event.setError(error);
}
};
private static final EventTranslator<ReportingEvent> SHUTDOWN_EVENT_TRANSLATOR = new EventTranslator<ReportingEvent>() {
@Override
public void translateTo(ReportingEvent event, long sequence) {
event.shutdownEvent();
}
};

private final Disruptor<ReportingEvent> disruptor;
private final AtomicLong dropped = new AtomicLong();
Expand Down Expand Up @@ -206,7 +216,12 @@ private boolean isEventProcessed(long sequence) {

@Override
public void close() {
disruptor.shutdown();
disruptor.publishEvent(SHUTDOWN_EVENT_TRANSLATOR);
try {
disruptor.shutdown(5, TimeUnit.SECONDS);
} catch (com.lmax.disruptor.TimeoutException e) {
logger.warn("Timeout while shutting down disruptor");
}
reportingEventHandler.close();
if (metricsReportingScheduler != null) {
metricsReportingScheduler.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class IntakeV2ReportingEventHandler implements ReportingEventHandler {
private TimerTask timeoutTask;
private int errorCount;
private long gracePeriodEnd;
private boolean shutDown;

public IntakeV2ReportingEventHandler(Service service, ProcessInfo process, SystemInfo system,
ReporterConfiguration reporterConfiguration, ProcessorEventHandler processorEventHandler,
Expand Down Expand Up @@ -142,7 +143,9 @@ public void onEvent(ReportingEvent event, long sequence, boolean endOfBatch) {
logger.debug("Receiving {} event (sequence {})", event.getType(), sequence);
}
try {
handleEvent(event, sequence, endOfBatch);
if (!shutDown) {
handleEvent(event, sequence, endOfBatch);
}
} finally {
event.resetState();
}
Expand All @@ -155,6 +158,10 @@ private void handleEvent(ReportingEvent event, long sequence, boolean endOfBatch
} else if (event.getType() == ReportingEvent.ReportingEventType.FLUSH) {
flush();
return;
} else if (event.getType() == ReportingEvent.ReportingEventType.SHUTDOWN) {
shutDown = true;
flush();
return;
}
processorEventHandler.onEvent(event, sequence, endOfBatch);
if (connection == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.ERROR;
import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.FLUSH;
import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.METRICS;
import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.SHUTDOWN;
import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.SPAN;
import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.TRANSACTION;

Expand Down Expand Up @@ -96,12 +97,16 @@ public void reportMetrics(MetricRegistry metricRegistry) {
this.type = METRICS;
}

public void shutdownEvent() {
this.type = SHUTDOWN;
}

@Nullable
public MetricRegistry getMetricRegistry() {
return metricRegistry;
}

enum ReportingEventType {
FLUSH, TRANSACTION, SPAN, ERROR, METRICS
FLUSH, TRANSACTION, SPAN, ERROR, METRICS, SHUTDOWN
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,19 @@ void testReport() {
assertThat(ndJsonNodes.get(3).get("error")).isNotNull();
}

@Test
void testShutDown() {
reportTransaction();
sendShutdownEvent();
reportSpan();
reportingEventHandler.flush();

final List<JsonNode> ndJsonNodes = getNdJsonNodes();
assertThat(ndJsonNodes).hasSize(2);
assertThat(ndJsonNodes.get(0).get("metadata")).isNotNull();
assertThat(ndJsonNodes.get(1).get("transaction")).isNotNull();
}

@Test
void testReportRoundRobinOnServerError() {
mockApmServer1.stubFor(post(INTAKE_V2_URL).willReturn(serviceUnavailable()));
Expand Down Expand Up @@ -191,6 +204,12 @@ private void reportError() {
reportingEventHandler.onEvent(reportingEvent, -1, true);
}

private void sendShutdownEvent() {
final ReportingEvent reportingEvent = new ReportingEvent();
reportingEvent.shutdownEvent();
reportingEventHandler.onEvent(reportingEvent, -1, true);
}

private List<JsonNode> getNdJsonNodes() {
return Stream.of(mockApmServer1, mockApmServer2)
.flatMap(apmServer -> apmServer.findAll(postRequestedFor(urlEqualTo(INTAKE_V2_URL))).stream())
Expand Down

0 comments on commit 64a6c21

Please sign in to comment.