From 64a6c21d3b3cfe50b1fd58e7d8bbe62daccd13b7 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Mon, 7 Jan 2019 09:43:13 +0100 Subject: [PATCH] Flush reporter on shutdown (#397) reference: https://discuss.elastic.co/t/debugging-information-for-java-agent/160925/24 --- .../apm/agent/report/ApmServerReporter.java | 17 ++++++++++++++++- .../report/IntakeV2ReportingEventHandler.java | 9 ++++++++- .../apm/agent/report/ReportingEvent.java | 7 ++++++- .../IntakeV2ReportingEventHandlerTest.java | 19 +++++++++++++++++++ 4 files changed, 49 insertions(+), 3 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java index 1acf0970a7..0aa89cf176 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ApmServerReporter.java @@ -32,6 +32,8 @@ import com.lmax.disruptor.PhasedBackoffWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.util.concurrent.Future; @@ -49,6 +51,8 @@ */ public class ApmServerReporter implements Reporter { + private static final Logger logger = LoggerFactory.getLogger(ApmServerReporter.class); + private static final EventTranslatorOneArg TRANSACTION_EVENT_TRANSLATOR = new EventTranslatorOneArg() { @Override public void translateTo(ReportingEvent event, long sequence, Transaction t) { @@ -73,6 +77,12 @@ public void translateTo(ReportingEvent event, long sequence, ErrorCapture error) event.setError(error); } }; + private static final EventTranslator SHUTDOWN_EVENT_TRANSLATOR = new EventTranslator() { + @Override + public void translateTo(ReportingEvent event, long sequence) { + event.shutdownEvent(); + } + }; private final Disruptor disruptor; private final AtomicLong dropped = new AtomicLong(); @@ -206,7 +216,12 @@ private boolean isEventProcessed(long sequence) { @Override public void close() { - disruptor.shutdown(); + disruptor.publishEvent(SHUTDOWN_EVENT_TRANSLATOR); + try { + disruptor.shutdown(5, TimeUnit.SECONDS); + } catch (com.lmax.disruptor.TimeoutException e) { + logger.warn("Timeout while shutting down disruptor"); + } reportingEventHandler.close(); if (metricsReportingScheduler != null) { metricsReportingScheduler.shutdown(); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java index a7de06e3c4..35c6fd785c 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandler.java @@ -82,6 +82,7 @@ public class IntakeV2ReportingEventHandler implements ReportingEventHandler { private TimerTask timeoutTask; private int errorCount; private long gracePeriodEnd; + private boolean shutDown; public IntakeV2ReportingEventHandler(Service service, ProcessInfo process, SystemInfo system, ReporterConfiguration reporterConfiguration, ProcessorEventHandler processorEventHandler, @@ -142,7 +143,9 @@ public void onEvent(ReportingEvent event, long sequence, boolean endOfBatch) { logger.debug("Receiving {} event (sequence {})", event.getType(), sequence); } try { - handleEvent(event, sequence, endOfBatch); + if (!shutDown) { + handleEvent(event, sequence, endOfBatch); + } } finally { event.resetState(); } @@ -155,6 +158,10 @@ private void handleEvent(ReportingEvent event, long sequence, boolean endOfBatch } else if (event.getType() == ReportingEvent.ReportingEventType.FLUSH) { flush(); return; + } else if (event.getType() == ReportingEvent.ReportingEventType.SHUTDOWN) { + shutDown = true; + flush(); + return; } processorEventHandler.onEvent(event, sequence, endOfBatch); if (connection == null) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ReportingEvent.java b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ReportingEvent.java index bd5c23f50e..b7f9c6dcb3 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ReportingEvent.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/agent/report/ReportingEvent.java @@ -29,6 +29,7 @@ import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.ERROR; import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.FLUSH; import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.METRICS; +import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.SHUTDOWN; import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.SPAN; import static co.elastic.apm.agent.report.ReportingEvent.ReportingEventType.TRANSACTION; @@ -96,12 +97,16 @@ public void reportMetrics(MetricRegistry metricRegistry) { this.type = METRICS; } + public void shutdownEvent() { + this.type = SHUTDOWN; + } + @Nullable public MetricRegistry getMetricRegistry() { return metricRegistry; } enum ReportingEventType { - FLUSH, TRANSACTION, SPAN, ERROR, METRICS + FLUSH, TRANSACTION, SPAN, ERROR, METRICS, SHUTDOWN } } diff --git a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandlerTest.java b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandlerTest.java index 98578010eb..59b6a4b39d 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandlerTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandlerTest.java @@ -133,6 +133,19 @@ void testReport() { assertThat(ndJsonNodes.get(3).get("error")).isNotNull(); } + @Test + void testShutDown() { + reportTransaction(); + sendShutdownEvent(); + reportSpan(); + reportingEventHandler.flush(); + + final List ndJsonNodes = getNdJsonNodes(); + assertThat(ndJsonNodes).hasSize(2); + assertThat(ndJsonNodes.get(0).get("metadata")).isNotNull(); + assertThat(ndJsonNodes.get(1).get("transaction")).isNotNull(); + } + @Test void testReportRoundRobinOnServerError() { mockApmServer1.stubFor(post(INTAKE_V2_URL).willReturn(serviceUnavailable())); @@ -191,6 +204,12 @@ private void reportError() { reportingEventHandler.onEvent(reportingEvent, -1, true); } + private void sendShutdownEvent() { + final ReportingEvent reportingEvent = new ReportingEvent(); + reportingEvent.shutdownEvent(); + reportingEventHandler.onEvent(reportingEvent, -1, true); + } + private List getNdJsonNodes() { return Stream.of(mockApmServer1, mockApmServer2) .flatMap(apmServer -> apmServer.findAll(postRequestedFor(urlEqualTo(INTAKE_V2_URL))).stream())