From 1fad6887d2e096f661c0f63860d13a4e0404394a Mon Sep 17 00:00:00 2001 From: James Davis Date: Fri, 16 Feb 2018 04:58:24 -0500 Subject: [PATCH] Implementing Retry logic [Reliable Channel] [STABLE Branch] (#561) * Initial commit of retry and backoff logic fixes * Fixing warnings on files I touched this round * Fix the eclipse UI from screaming about the docker Contstants * Fixed backoff logic to use existing method. Added more logging to the sender channel. * Added the partial response handler, more logging * Added gson to core. Fixed backoff manager to keep original functionality. Added extension to return the timeout values as expected before. * Added unit tests. * Fixing string typed ArrayList<> to List<> per Dhaval * Missed one * Making tests consistent. * Added javadoc comments, simplified logic for a few methods * Added exception logging per @dhaval24. Fixed formatting on touched files * Updates per last round of commits Moved the Handlers out of the concrete package to the common package to keep the same consistency. Removed a couple of unessecary methods. Added docs. * Latest fixes * Add MaxInstantRetry Added MaxInstantRetry configuration to allow for instantaneous retry on a failed transmission. * Javadoc Updates Javadoc and formatting updates * NumberFormatException fix Added null check * JavaDocs for TPM --- core/build.gradle | 2 + .../inprocess/InProcessTelemetryChannel.java | 628 +++++++++--------- .../InProcessTelemetryChannelFactory.java | 15 +- .../internal/channel/TransmissionHandler.java | 17 + .../channel/TransmissionHandlerArgs.java | 87 +++ .../channel/TransmissionHandlerObserver.java | 18 + .../internal/channel/TransmitterFactory.java | 16 + .../common/ActiveTransmissionLoader.java | 6 +- .../channel/common/BackendResponse.java | 20 + .../internal/channel/common/ErrorHandler.java | 70 ++ .../channel/common/PartialSuccessHandler.java | 192 ++++++ .../common/SenderThreadLocalBackOffData.java | 82 ++- .../common/SenderThreadsBackOffManager.java | 7 +- .../channel/common/ThrottlingHandler.java | 135 ++++ .../common/TransmissionFileSystemOutput.java | 4 +- .../common/TransmissionNetworkOutput.java | 500 +++++++------- .../channel/common/TransmissionPolicy.java | 3 +- .../common/TransmissionPolicyManager.java | 96 ++- .../common/TransmissionSendResult.java | 30 +- .../{Constants.java => ConstantsTest.java} | 2 +- .../docker/DockerContextInitializerTests.java | 2 +- .../channel/common/ErrorHandlerTest.java | 107 +++ .../common/PartialSuccessHandlerTest.java | 266 ++++++++ .../channel/common/ThrottlingHandlerTest.java | 145 ++++ .../InProcessTelemetryChannelTest.java | 22 + .../ThroughputTestTransmitterFactory.java | 11 + 26 files changed, 1844 insertions(+), 639 deletions(-) create mode 100644 core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandler.java create mode 100644 core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandlerArgs.java create mode 100644 core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandlerObserver.java create mode 100644 core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/BackendResponse.java create mode 100644 core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ErrorHandler.java create mode 100644 core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/PartialSuccessHandler.java create mode 100644 core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ThrottlingHandler.java rename core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/{Constants.java => ConstantsTest.java} (98%) create mode 100644 core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/ErrorHandlerTest.java create mode 100644 core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/PartialSuccessHandlerTest.java create mode 100644 core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/ThrottlingHandlerTest.java diff --git a/core/build.gradle b/core/build.gradle index da579048d7e..c9bce568a34 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -44,6 +44,7 @@ dependencies { compile ([group: 'org.apache.commons', name: 'commons-lang3', version: '3.7']) compile ([group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.3']) compile ([group: 'com.google.guava', name: 'guava', version: '20.0']) + compile ([group: 'com.google.code.gson', name: 'gson', version: '2.8.2']) testCompile group: 'junit', name: 'junit', version: '4.12' testCompile group: 'org.mockito', name: 'mockito-all', version: '1.10.19' testCompile group: 'com.google.code.gson', name: 'gson', version: '2.8.2' @@ -56,6 +57,7 @@ shadowJar { relocate 'org.apache.commons', 'com.microsoft.applicationinsights.core.dependencies.apachecommons' relocate 'com.google.common', 'com.microsoft.applicationinsights.core.dependencies.googlecommon' relocate 'javax.annotation', 'com.microsoft.applicationinsights.core.dependencies.javaxannotation' + relocate 'com.google.gson', 'com.microsoft.applicationinsights.core.dependencies.gson' } jar { diff --git a/core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryChannel.java b/core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryChannel.java index 05bac6d1dbb..a01766ba919 100644 --- a/core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryChannel.java +++ b/core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryChannel.java @@ -22,7 +22,6 @@ package com.microsoft.applicationinsights.channel.concrete.inprocess; import java.io.IOException; -import java.io.PrintWriter; import java.io.StringWriter; import java.net.URI; import java.util.Map; @@ -38,7 +37,6 @@ import com.microsoft.applicationinsights.internal.util.LocalStringsUtils; import com.microsoft.applicationinsights.internal.util.Sanitizer; import com.microsoft.applicationinsights.telemetry.JsonTelemetryDataSerializer; -import com.microsoft.applicationinsights.telemetry.SupportSampling; import com.microsoft.applicationinsights.telemetry.Telemetry; import com.microsoft.applicationinsights.channel.TelemetryChannel; @@ -47,313 +45,343 @@ import org.apache.commons.lang3.exception.ExceptionUtils; /** - * An implementation of {@link com.microsoft.applicationinsights.channel.TelemetryChannel} + * An implementation of + * {@link com.microsoft.applicationinsights.channel.TelemetryChannel} *

* The channel holds two main entities: *

- * A buffer for incoming {@link com.microsoft.applicationinsights.telemetry.Telemetry} instances - * A transmitter + * A buffer for incoming + * {@link com.microsoft.applicationinsights.telemetry.Telemetry} instances A + * transmitter *

- * The buffer is stores incoming telemetry instances. Every new buffer starts a timer. - * When the timer expires, or when the buffer is 'full' (whichever happens first), the - * transmitter will pick up that buffer and will handle its sending to the server. For example, - * a transmitter will be responsible for compressing, sending and activate a policy in case of failures. + * The buffer is stores incoming telemetry instances. Every new buffer starts a + * timer. When the timer expires, or when the buffer is 'full' (whichever + * happens first), the transmitter will pick up that buffer and will handle its + * sending to the server. For example, a transmitter will be responsible for + * compressing, sending and activate a policy in case of failures. *

* The model here is: *

- * Use application threads to populate the buffer - * Use channel's threads to send buffers to the server + * Use application threads to populate the buffer Use channel's threads to send + * buffers to the server *

* Created by gupele on 12/17/2014. */ public final class InProcessTelemetryChannel implements TelemetryChannel { - private final static int DEFAULT_MAX_TELEMETRY_BUFFER_CAPACITY = 500; - private final static int MIN_MAX_TELEMETRY_BUFFER_CAPACITY = 1; - private final static int MAX_MAX_TELEMETRY_BUFFER_CAPACITY = 1000; - private final static String MAX_MAX_TELEMETRY_BUFFER_CAPACITY_NAME = "MaxTelemetryBufferCapacity"; - - private final static int DEFAULT_FLUSH_BUFFER_TIMEOUT_IN_SECONDS = 5; - private final static int MIN_FLUSH_BUFFER_TIMEOUT_IN_SECONDS = 1; - private final static int MAX_FLUSH_BUFFER_TIMEOUT_IN_SECONDS = 300; - private final static String FLUSH_BUFFER_TIMEOUT_IN_SECONDS_NAME = "FlushIntervalInSeconds"; - - private final static String DEVELOPER_MODE_SYSTEM_PROPRETY_NAME = "APPLICATION_INSIGHTS_DEVELOPER_MODE"; - - private final static String DEVELOPER_MODE_NAME = "DeveloperMode"; - private final static String ENDPOINT_ADDRESS_NAME = "EndpointAddress"; - private final static String MAX_TRANSMISSION_STORAGE_CAPACITY_NAME = "MaxTransmissionStorageFilesCapacityInMB"; - - private boolean developerMode = false; - private static TransmitterFactory s_transmitterFactory; - - private boolean stopped = false; - - private TelemetriesTransmitter telemetriesTransmitter; - - private TelemetryBuffer telemetryBuffer; - private TelemetrySampler telemetrySampler; - - private static AtomicLong itemsSent = new AtomicLong(0); - - public InProcessTelemetryChannel() { - boolean developerMode = false; - try { - String developerModeAsString = System.getProperty(DEVELOPER_MODE_SYSTEM_PROPRETY_NAME); - if (!LocalStringsUtils.isNullOrEmpty(developerModeAsString)) { - developerMode = Boolean.valueOf(developerModeAsString); - } - } catch (Throwable t) { - developerMode = false; - InternalLogger.INSTANCE.trace("%s generated exception in parsing," + - "stack trace is %s", DEVELOPER_MODE_SYSTEM_PROPRETY_NAME, ExceptionUtils.getStackTrace(t)); - } - initialize(null, - null, - developerMode, - createDefaultMaxTelemetryBufferCapacityEnforcer(null), - createDefaultSendIntervalInSecondsEnforcer(null), - true); - } - - /** - * Ctor - * - * @param endpointAddress Must be empty string or a valid uri, else an exception will be thrown - * @param developerMode True will behave in a 'non-production' mode to ease the debugging - * @param maxTelemetryBufferCapacity Max number of Telemetries we keep in the buffer, when reached we will send the buffer - * Note, value should be between TRANSMIT_BUFFER_MIN_TIMEOUT_IN_MILLIS and TRANSMIT_BUFFER_MAX_TIMEOUT_IN_MILLIS inclusive - * @param sendIntervalInMillis The maximum number of milliseconds to wait before we send the buffer - * Note, value should be between MIN_MAX_TELEMETRY_BUFFER_CAPACITY and MAX_MAX_TELEMETRY_BUFFER_CAPACITY inclusive - */ - public InProcessTelemetryChannel(String endpointAddress, boolean developerMode, int maxTelemetryBufferCapacity, int sendIntervalInMillis) { - initialize(endpointAddress, - null, - developerMode, - createDefaultMaxTelemetryBufferCapacityEnforcer(maxTelemetryBufferCapacity), - createDefaultSendIntervalInSecondsEnforcer(sendIntervalInMillis), - true); - } - - /** - * This Ctor will query the 'namesAndValues' map for data to initialize itself - * It will ignore data that is not of its interest, this Ctor is useful for building an instance from configuration - * - * @param namesAndValues - The data passed as name and value pairs - */ - public InProcessTelemetryChannel(Map namesAndValues) { - boolean developerMode = false; - String endpointAddress = null; - - LimitsEnforcer maxTelemetryBufferCapacityEnforcer = createDefaultMaxTelemetryBufferCapacityEnforcer(null); - - LimitsEnforcer sendIntervalInSecondsEnforcer = createDefaultSendIntervalInSecondsEnforcer(null); - - boolean throttling = true; - if (namesAndValues != null) { - throttling = Boolean.valueOf(namesAndValues.get("Throttling")); - developerMode = Boolean.valueOf(namesAndValues.get(DEVELOPER_MODE_NAME)); - if (!developerMode) { - developerMode = Boolean.valueOf(System.getProperty(DEVELOPER_MODE_SYSTEM_PROPRETY_NAME)); - } - endpointAddress = namesAndValues.get(ENDPOINT_ADDRESS_NAME); - - maxTelemetryBufferCapacityEnforcer.normalizeStringValue(namesAndValues.get(MAX_MAX_TELEMETRY_BUFFER_CAPACITY_NAME)); - sendIntervalInSecondsEnforcer.normalizeStringValue(namesAndValues.get(FLUSH_BUFFER_TIMEOUT_IN_SECONDS_NAME)); - } - - String maxTransmissionStorageCapacity = namesAndValues.get(MAX_TRANSMISSION_STORAGE_CAPACITY_NAME); - initialize(endpointAddress, maxTransmissionStorageCapacity, developerMode, maxTelemetryBufferCapacityEnforcer, sendIntervalInSecondsEnforcer, throttling); - } - - /** - * Gets value indicating whether this channel is in developer mode. - */ - @Override - public boolean isDeveloperMode() { - return developerMode; - } - - /** - * Sets value indicating whether this channel is in developer mode. - * - * @param developerMode True or false - */ - @Override - public void setDeveloperMode(boolean developerMode) { - if (developerMode != this.developerMode) { - this.developerMode = developerMode; - int maxTelemetriesInBatch = this.developerMode ? 1 : DEFAULT_MAX_TELEMETRY_BUFFER_CAPACITY; - - setMaxTelemetriesInBatch(maxTelemetriesInBatch); - } - } - - /** - * Sends a Telemetry instance through the channel. - */ - @Override - public void send(Telemetry telemetry) { - Preconditions.checkNotNull(telemetry, "Telemetry item must be non null"); - - if (isDeveloperMode()) { - telemetry.getContext().getProperties().put("DeveloperMode", "true"); - } - - if (telemetrySampler != null) { - if (!telemetrySampler.isSampledIn(telemetry)) { - return; - } - } - - StringWriter writer = new StringWriter(); - JsonTelemetryDataSerializer jsonWriter = null; - try { - jsonWriter = new JsonTelemetryDataSerializer(writer); - telemetry.serialize(jsonWriter); - jsonWriter.close(); - String asJson = writer.toString(); - telemetryBuffer.add(asJson); - telemetry.reset(); - if (itemsSent.incrementAndGet() % 10000 == 0) { - InternalLogger.INSTANCE.info("items sent till now %d", itemsSent.get()); - } - - } catch (IOException e) { - InternalLogger.INSTANCE.error("Failed to serialize Telemetry"); - InternalLogger.INSTANCE.trace("Stack trace is %s", ExceptionUtils.getStackTrace(e)); - return; - } - - if (isDeveloperMode()) { - writeTelemetryToDebugOutput(telemetry); - } - } - - /** - * Stops on going work - */ - @Override - public synchronized void stop(long timeout, TimeUnit timeUnit) { - try { - if (stopped) { - return; - } - - telemetriesTransmitter.stop(timeout, timeUnit); - stopped = true; - } catch (Throwable t) { - InternalLogger.INSTANCE.error("Exception generated while stopping telemetry transmitter"); - InternalLogger.INSTANCE.trace("Stack trace generated is %s", ExceptionUtils.getStackTrace(t)); - } - } - - /** - * Flushes the data that the channel might have internally. - */ - @Override - public void flush() { - telemetryBuffer.flush(); - } - - /** - * Sets an optional Sampler that can sample out telemetries - * Currently, we don't allow to replace a valid telemtry sampler. - * - * @param telemetrySampler - The sampler - */ - @Override - public void setSampler(TelemetrySampler telemetrySampler) { - if (this.telemetrySampler == null) { - this.telemetrySampler = telemetrySampler; - } - } - - /** - * Sets the buffer size - * - * @param maxTelemetriesInBatch should be between MIN_MAX_TELEMETRY_BUFFER_CAPACITY - * and MAX_MAX_TELEMETRY_BUFFER_CAPACITY inclusive - * if the number is lower than the minimum then the minimum will be used - * if the number is higher than the maximum then the maximum will be used - */ - public void setMaxTelemetriesInBatch(int maxTelemetriesInBatch) { - telemetryBuffer.setMaxTelemetriesInBatch(maxTelemetriesInBatch); - } - - /** - * Sets the time tow wait before flushing the internal buffer - * - * @param transmitBufferTimeoutInSeconds should be between MIN_FLUSH_BUFFER_TIMEOUT_IN_SECONDS - * and MAX_FLUSH_BUFFER_TIMEOUT_IN_SECONDS inclusive - * if the number is lower than the minimum then the minimum will be used - * if the number is higher than the maximum then the maximum will be used - */ - public void setTransmitBufferTimeoutInSeconds(int transmitBufferTimeoutInSeconds) { - telemetryBuffer.setTransmitBufferTimeoutInSeconds(transmitBufferTimeoutInSeconds); - } - - private void writeTelemetryToDebugOutput(Telemetry telemetry) { - InternalLogger.INSTANCE.trace("InProcessTelemetryChannel sending telemetry"); - } - - private synchronized void initialize(String endpointAddress, - String maxTransmissionStorageCapacity, - boolean developerMode, - LimitsEnforcer maxTelemetryBufferCapacityEnforcer, - LimitsEnforcer sendIntervalInSeconds, - boolean throttling) { - makeSureEndpointAddressIsValid(endpointAddress); - - if (s_transmitterFactory == null) { - s_transmitterFactory = new InProcessTelemetryChannelFactory(); - } - - telemetriesTransmitter = s_transmitterFactory.create(endpointAddress, maxTransmissionStorageCapacity, throttling); - telemetryBuffer = new TelemetryBuffer(telemetriesTransmitter, maxTelemetryBufferCapacityEnforcer, sendIntervalInSeconds); - - setDeveloperMode(developerMode); - } - - /** - * The method will throw IllegalArgumentException if the endpointAddress is not a valid uri - * Please note that a null or empty string is valid as far as the class is concerned and thus considered valid - * - * @param endpointAddress - */ - private void makeSureEndpointAddressIsValid(String endpointAddress) { - if (Strings.isNullOrEmpty(endpointAddress)) { - return; - } - - URI uri = Sanitizer.sanitizeUri(endpointAddress); - if (uri == null) { - String errorMessage = String.format("Endpoint address %s is not a valid uri", endpointAddress); - InternalLogger.INSTANCE.error(errorMessage); - throw new IllegalArgumentException(errorMessage); - } - } - - private LimitsEnforcer createDefaultMaxTelemetryBufferCapacityEnforcer(Integer currentValue) { - LimitsEnforcer maxItemsInBatchEnforcer = - LimitsEnforcer.createWithClosestLimitOnError( - MAX_MAX_TELEMETRY_BUFFER_CAPACITY_NAME, - MIN_MAX_TELEMETRY_BUFFER_CAPACITY, - MAX_MAX_TELEMETRY_BUFFER_CAPACITY, - DEFAULT_MAX_TELEMETRY_BUFFER_CAPACITY, - currentValue == null ? DEFAULT_MAX_TELEMETRY_BUFFER_CAPACITY : currentValue); - - return maxItemsInBatchEnforcer; - } - - private LimitsEnforcer createDefaultSendIntervalInSecondsEnforcer(Integer currentValue) { - LimitsEnforcer sendIntervalInSecondsEnforcer = - LimitsEnforcer.createWithClosestLimitOnError( - FLUSH_BUFFER_TIMEOUT_IN_SECONDS_NAME, - MIN_FLUSH_BUFFER_TIMEOUT_IN_SECONDS, - MAX_FLUSH_BUFFER_TIMEOUT_IN_SECONDS, - DEFAULT_FLUSH_BUFFER_TIMEOUT_IN_SECONDS, - currentValue == null ? DEFAULT_FLUSH_BUFFER_TIMEOUT_IN_SECONDS : currentValue); - - return sendIntervalInSecondsEnforcer; - } + + private final static String INSTANT_RETRY_NAME = "MaxInstantRetry"; + private final static int DEFAULT_MAX_INSTANT_RETRY = 3; + private final static int DEFAULT_MAX_TELEMETRY_BUFFER_CAPACITY = 500; + private final static int MIN_MAX_TELEMETRY_BUFFER_CAPACITY = 1; + private final static int MAX_MAX_TELEMETRY_BUFFER_CAPACITY = 1000; + private final static String MAX_MAX_TELEMETRY_BUFFER_CAPACITY_NAME = "MaxTelemetryBufferCapacity"; + + private final static int DEFAULT_FLUSH_BUFFER_TIMEOUT_IN_SECONDS = 5; + private final static int MIN_FLUSH_BUFFER_TIMEOUT_IN_SECONDS = 1; + private final static int MAX_FLUSH_BUFFER_TIMEOUT_IN_SECONDS = 300; + private final static String FLUSH_BUFFER_TIMEOUT_IN_SECONDS_NAME = "FlushIntervalInSeconds"; + + private final static String DEVELOPER_MODE_SYSTEM_PROPRETY_NAME = "APPLICATION_INSIGHTS_DEVELOPER_MODE"; + + private final static String DEVELOPER_MODE_NAME = "DeveloperMode"; + private final static String ENDPOINT_ADDRESS_NAME = "EndpointAddress"; + private final static String MAX_TRANSMISSION_STORAGE_CAPACITY_NAME = "MaxTransmissionStorageFilesCapacityInMB"; + + private boolean developerMode = false; + private static TransmitterFactory s_transmitterFactory; + + private boolean stopped = false; + + private TelemetriesTransmitter telemetriesTransmitter; + + private TelemetryBuffer telemetryBuffer; + private TelemetrySampler telemetrySampler; + + private static AtomicLong itemsSent = new AtomicLong(0); + + public InProcessTelemetryChannel() { + boolean developerMode = false; + try { + String developerModeAsString = System.getProperty(DEVELOPER_MODE_SYSTEM_PROPRETY_NAME); + if (!LocalStringsUtils.isNullOrEmpty(developerModeAsString)) { + developerMode = Boolean.valueOf(developerModeAsString); + } + } catch (Throwable t) { + developerMode = false; + InternalLogger.INSTANCE.trace("%s generated exception in parsing," + "stack trace is %s", + DEVELOPER_MODE_SYSTEM_PROPRETY_NAME, ExceptionUtils.getStackTrace(t)); + } + initialize(null, null, developerMode, createDefaultMaxTelemetryBufferCapacityEnforcer(null), + createDefaultSendIntervalInSecondsEnforcer(null), true); + } + + /** + * Ctor + * + * @param endpointAddress + * Must be empty string or a valid uri, else an exception will be + * thrown + * @param developerMode + * True will behave in a 'non-production' mode to ease the debugging + * @param maxTelemetryBufferCapacity + * Max number of Telemetries we keep in the buffer, when reached we + * will send the buffer Note, value should be between + * TRANSMIT_BUFFER_MIN_TIMEOUT_IN_MILLIS and + * TRANSMIT_BUFFER_MAX_TIMEOUT_IN_MILLIS inclusive + * @param sendIntervalInMillis + * The maximum number of milliseconds to wait before we send the + * buffer Note, value should be between + * MIN_MAX_TELEMETRY_BUFFER_CAPACITY and + * MAX_MAX_TELEMETRY_BUFFER_CAPACITY inclusive + */ + public InProcessTelemetryChannel(String endpointAddress, boolean developerMode, int maxTelemetryBufferCapacity, + int sendIntervalInMillis) { + initialize(endpointAddress, null, developerMode, + createDefaultMaxTelemetryBufferCapacityEnforcer(maxTelemetryBufferCapacity), + createDefaultSendIntervalInSecondsEnforcer(sendIntervalInMillis), true); + } + + /** + * This Ctor will query the 'namesAndValues' map for data to initialize itself + * It will ignore data that is not of its interest, this Ctor is useful for + * building an instance from configuration + * + * @param namesAndValues + * - The data passed as name and value pairs + */ + public InProcessTelemetryChannel(Map namesAndValues) { + boolean developerMode = false; + String endpointAddress = null; + int maxInstantRetries = DEFAULT_MAX_INSTANT_RETRY; + + LimitsEnforcer maxTelemetryBufferCapacityEnforcer = createDefaultMaxTelemetryBufferCapacityEnforcer(null); + + LimitsEnforcer sendIntervalInSecondsEnforcer = createDefaultSendIntervalInSecondsEnforcer(null); + + boolean throttling = true; + if (namesAndValues != null) { + throttling = Boolean.valueOf(namesAndValues.get("Throttling")); + developerMode = Boolean.valueOf(namesAndValues.get(DEVELOPER_MODE_NAME)); + try { + String instantRetryValue = namesAndValues.get(INSTANT_RETRY_NAME); + if (instantRetryValue != null){ + maxInstantRetries = Integer.parseInt(instantRetryValue); + } + + } catch (NumberFormatException e) { + InternalLogger.INSTANCE.error("Unable to parse configuration setting %s to integer value.%nStack Trace:%n%s", INSTANT_RETRY_NAME, ExceptionUtils.getStackTrace(e)); + } + + if (!developerMode) { + developerMode = Boolean.valueOf(System.getProperty(DEVELOPER_MODE_SYSTEM_PROPRETY_NAME)); + } + endpointAddress = namesAndValues.get(ENDPOINT_ADDRESS_NAME); + + maxTelemetryBufferCapacityEnforcer + .normalizeStringValue(namesAndValues.get(MAX_MAX_TELEMETRY_BUFFER_CAPACITY_NAME)); + sendIntervalInSecondsEnforcer + .normalizeStringValue(namesAndValues.get(FLUSH_BUFFER_TIMEOUT_IN_SECONDS_NAME)); + } + + String maxTransmissionStorageCapacity = namesAndValues.get(MAX_TRANSMISSION_STORAGE_CAPACITY_NAME); + initialize(endpointAddress, maxTransmissionStorageCapacity, developerMode, maxTelemetryBufferCapacityEnforcer, + sendIntervalInSecondsEnforcer, throttling, maxInstantRetries); + } + + /** + * Gets value indicating whether this channel is in developer mode. + */ + @Override + public boolean isDeveloperMode() { + return developerMode; + } + + /** + * Sets value indicating whether this channel is in developer mode. + * + * @param developerMode + * True or false + */ + @Override + public void setDeveloperMode(boolean developerMode) { + if (developerMode != this.developerMode) { + this.developerMode = developerMode; + int maxTelemetriesInBatch = this.developerMode ? 1 : DEFAULT_MAX_TELEMETRY_BUFFER_CAPACITY; + + setMaxTelemetriesInBatch(maxTelemetriesInBatch); + } + } + + /** + * Sends a Telemetry instance through the channel. + */ + @Override + public void send(Telemetry telemetry) { + Preconditions.checkNotNull(telemetry, "Telemetry item must be non null"); + + if (isDeveloperMode()) { + telemetry.getContext().getProperties().put("DeveloperMode", "true"); + } + + if (telemetrySampler != null) { + if (!telemetrySampler.isSampledIn(telemetry)) { + return; + } + } + + StringWriter writer = new StringWriter(); + JsonTelemetryDataSerializer jsonWriter = null; + try { + jsonWriter = new JsonTelemetryDataSerializer(writer); + telemetry.serialize(jsonWriter); + jsonWriter.close(); + String asJson = writer.toString(); + telemetryBuffer.add(asJson); + telemetry.reset(); + if (itemsSent.incrementAndGet() % 10000 == 0) { + InternalLogger.INSTANCE.info("items sent till now %d", itemsSent.get()); + } + + } catch (IOException e) { + InternalLogger.INSTANCE.error("Failed to serialize Telemetry"); + InternalLogger.INSTANCE.trace("Stack trace is %s", ExceptionUtils.getStackTrace(e)); + return; + } + + if (isDeveloperMode()) { + writeTelemetryToDebugOutput(telemetry); + } + } + + /** + * Stops on going work + */ + @Override + public synchronized void stop(long timeout, TimeUnit timeUnit) { + try { + if (stopped) { + return; + } + + telemetriesTransmitter.stop(timeout, timeUnit); + stopped = true; + } catch (Throwable t) { + InternalLogger.INSTANCE.error("Exception generated while stopping telemetry transmitter"); + InternalLogger.INSTANCE.trace("Stack trace generated is %s", ExceptionUtils.getStackTrace(t)); + } + } + + /** + * Flushes the data that the channel might have internally. + */ + @Override + public void flush() { + telemetryBuffer.flush(); + } + + /** + * Sets an optional Sampler that can sample out telemetries Currently, we don't + * allow to replace a valid telemtry sampler. + * + * @param telemetrySampler + * - The sampler + */ + @Override + public void setSampler(TelemetrySampler telemetrySampler) { + if (this.telemetrySampler == null) { + this.telemetrySampler = telemetrySampler; + } + } + + /** + * Sets the buffer size + * + * @param maxTelemetriesInBatch + * should be between MIN_MAX_TELEMETRY_BUFFER_CAPACITY and + * MAX_MAX_TELEMETRY_BUFFER_CAPACITY inclusive if the number is lower + * than the minimum then the minimum will be used if the number is + * higher than the maximum then the maximum will be used + */ + public void setMaxTelemetriesInBatch(int maxTelemetriesInBatch) { + telemetryBuffer.setMaxTelemetriesInBatch(maxTelemetriesInBatch); + } + + /** + * Sets the time tow wait before flushing the internal buffer + * + * @param transmitBufferTimeoutInSeconds + * should be between MIN_FLUSH_BUFFER_TIMEOUT_IN_SECONDS and + * MAX_FLUSH_BUFFER_TIMEOUT_IN_SECONDS inclusive if the number is + * lower than the minimum then the minimum will be used if the number + * is higher than the maximum then the maximum will be used + */ + public void setTransmitBufferTimeoutInSeconds(int transmitBufferTimeoutInSeconds) { + telemetryBuffer.setTransmitBufferTimeoutInSeconds(transmitBufferTimeoutInSeconds); + } + + private void writeTelemetryToDebugOutput(Telemetry telemetry) { + InternalLogger.INSTANCE.trace("InProcessTelemetryChannel sending telemetry"); + } + + private synchronized void initialize(String endpointAddress, String maxTransmissionStorageCapacity, + boolean developerMode, LimitsEnforcer maxTelemetryBufferCapacityEnforcer, + LimitsEnforcer sendIntervalInSeconds, boolean throttling) { + initialize(endpointAddress, maxTransmissionStorageCapacity, developerMode, maxTelemetryBufferCapacityEnforcer, + sendIntervalInSeconds, throttling, DEFAULT_MAX_INSTANT_RETRY); + } + + private synchronized void initialize(String endpointAddress, String maxTransmissionStorageCapacity, + boolean developerMode, LimitsEnforcer maxTelemetryBufferCapacityEnforcer, + LimitsEnforcer sendIntervalInSeconds, boolean throttling, int maxInstantRetry) { + makeSureEndpointAddressIsValid(endpointAddress); + + if (s_transmitterFactory == null) { + s_transmitterFactory = new InProcessTelemetryChannelFactory(); + } + + telemetriesTransmitter = s_transmitterFactory.create(endpointAddress, maxTransmissionStorageCapacity, + throttling, maxInstantRetry); + telemetryBuffer = new TelemetryBuffer(telemetriesTransmitter, maxTelemetryBufferCapacityEnforcer, + sendIntervalInSeconds); + + setDeveloperMode(developerMode); + } + + /** + * The method will throw IllegalArgumentException if the endpointAddress is not + * a valid uri Please note that a null or empty string is valid as far as the + * class is concerned and thus considered valid + * + * @param endpointAddress + */ + private void makeSureEndpointAddressIsValid(String endpointAddress) { + if (Strings.isNullOrEmpty(endpointAddress)) { + return; + } + + URI uri = Sanitizer.sanitizeUri(endpointAddress); + if (uri == null) { + String errorMessage = String.format("Endpoint address %s is not a valid uri", endpointAddress); + InternalLogger.INSTANCE.error(errorMessage); + throw new IllegalArgumentException(errorMessage); + } + } + + private LimitsEnforcer createDefaultMaxTelemetryBufferCapacityEnforcer(Integer currentValue) { + LimitsEnforcer maxItemsInBatchEnforcer = LimitsEnforcer.createWithClosestLimitOnError( + MAX_MAX_TELEMETRY_BUFFER_CAPACITY_NAME, MIN_MAX_TELEMETRY_BUFFER_CAPACITY, + MAX_MAX_TELEMETRY_BUFFER_CAPACITY, DEFAULT_MAX_TELEMETRY_BUFFER_CAPACITY, + currentValue == null ? DEFAULT_MAX_TELEMETRY_BUFFER_CAPACITY : currentValue); + + return maxItemsInBatchEnforcer; + } + + private LimitsEnforcer createDefaultSendIntervalInSecondsEnforcer(Integer currentValue) { + LimitsEnforcer sendIntervalInSecondsEnforcer = LimitsEnforcer.createWithClosestLimitOnError( + FLUSH_BUFFER_TIMEOUT_IN_SECONDS_NAME, MIN_FLUSH_BUFFER_TIMEOUT_IN_SECONDS, + MAX_FLUSH_BUFFER_TIMEOUT_IN_SECONDS, DEFAULT_FLUSH_BUFFER_TIMEOUT_IN_SECONDS, + currentValue == null ? DEFAULT_FLUSH_BUFFER_TIMEOUT_IN_SECONDS : currentValue); + + return sendIntervalInSecondsEnforcer; + } } diff --git a/core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryChannelFactory.java b/core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryChannelFactory.java index dfa01e53de5..4a687fcd72c 100644 --- a/core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryChannelFactory.java +++ b/core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryChannelFactory.java @@ -33,10 +33,18 @@ * Created by gupele on 1/15/2015. */ final class InProcessTelemetryChannelFactory implements TransmitterFactory { + private final int DEFAULT_RETRY = 3; + @Override + public TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled) { + return create(endpoint, maxTransmissionStorageCapacity, throttlingIsEnabled, DEFAULT_RETRY); + } @Override - public TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled) { - final TransmissionPolicyManager transmissionPolicyManager = new TransmissionPolicyManager(throttlingIsEnabled); - + public TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled, int maxInstantRetries) { + final TransmissionPolicyManager transmissionPolicyManager = new TransmissionPolicyManager(throttlingIsEnabled); + transmissionPolicyManager.addTransmissionHandler(new ErrorHandler(transmissionPolicyManager)); + transmissionPolicyManager.addTransmissionHandler(new PartialSuccessHandler(transmissionPolicyManager)); + transmissionPolicyManager.addTransmissionHandler(new ThrottlingHandler(transmissionPolicyManager)); + transmissionPolicyManager.setMaxInstantRetries(maxInstantRetries); // An active object with the network sender TransmissionNetworkOutput actualNetworkSender = TransmissionNetworkOutput.create(endpoint, transmissionPolicyManager); @@ -51,6 +59,7 @@ public TelemetriesTransmitter create(String endpoint, String maxTransmissionStor // The dispatcher works with the two active senders TransmissionDispatcher dispatcher = new NonBlockingDispatcher(new TransmissionOutput[] {networkSender, activeFileSystemOutput}); actualNetworkSender.setTransmissionDispatcher(dispatcher); + // The loader works with the file system loader as the active one does TransmissionsLoader transmissionsLoader = new ActiveTransmissionLoader(fileSystemSender, stateFetcher, dispatcher); diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandler.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandler.java new file mode 100644 index 00000000000..957fa314c37 --- /dev/null +++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandler.java @@ -0,0 +1,17 @@ +package com.microsoft.applicationinsights.internal.channel; + +/** + * An interface that is used to create a concrete class that is called by the the {@link TransmissionHandlerObserver} + *

+ * This is used to implement classes like {@link ErrorHandler} and {@link PartialSuccessHandler}. + * @author jamdavi + * + * + */ +public interface TransmissionHandler { + /** + * Called when a transmission is sent by the {@link TransmissionOutput}. + * @param args The {@link TransmissionHandlerArgs} for this handler. + */ + void onTransmissionSent(TransmissionHandlerArgs args); +} diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandlerArgs.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandlerArgs.java new file mode 100644 index 00000000000..cb767d206af --- /dev/null +++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandlerArgs.java @@ -0,0 +1,87 @@ +package com.microsoft.applicationinsights.internal.channel; + +import org.apache.http.Header; + +import com.microsoft.applicationinsights.internal.channel.common.Transmission; + +/** + * This class is used to store information between the transmission sender and the transmission handlers + *

+ * An example class that uses this are {@link ErrorHandler} + * @author jamdavi + * + */ +public class TransmissionHandlerArgs { + private String responseBody; + /** + * Set the response body. + * @param body The HTTP Response from the sender + */ + public void setResponseBody(String body) { this.responseBody = body;} + /** + * Get the response body + * @return The HTTP Response from the sender + */ + public String getResponseBody() { return this.responseBody;} + + + private TransmissionDispatcher transmissionDispatcher; + /** + * Set the {@link TransmissionDispatcher} used by the sender + * @param dispatcher The {@link TransmissionDispatcher} used by the sender + */ + public void setTransmissionDispatcher(TransmissionDispatcher dispatcher) { this.transmissionDispatcher = dispatcher;} + /** + * Get the {@link TransmissionDispatcher} used by the sender + * @return The {@link TransmissionDispatcher} used by the sender + */ + public TransmissionDispatcher getTransmissionDispatcher() { return this.transmissionDispatcher;} + + private Transmission transmission; + /** + * Set the transmission that needs to be passed to the handler. + * @param transmission The transmission that needs to be passed to the handler. + */ + public void setTransmission(Transmission transmission) { this.transmission = transmission;} + /** + * Get the transmission that needs to be passed to the handler. + * @return The transmission used by the handler. + */ + public Transmission getTransmission() { return this.transmission;} + + private int responseCode; + /** + * Set the response code to be passed to the handler. + * @param code The HTTP response code. + */ + public void setResponseCode(int code) { this.responseCode = code;} + /** + * Get the response code for the handler to use. + * @return The HTTP response code. + */ + public int getResponseCode() { return this.responseCode;} + + private Throwable exception; + /** + * Set the exception thrown by the sender to be passed the handler. + * @param ex The exception + */ + public void setException(Throwable ex) { this.exception = ex;} + /** + * Get the exception thrown by the sender to be used by the handler. + * @return The exception + */ + public Throwable getException() { return this.exception;} + + private Header retryHeader; + /** + * Set the Retry-After header to be passed to the handler. + * @param head The Retry-After header + */ + public void setRetryHeader(Header head) { this.retryHeader = head;} + /** + * Get the Retry-After header to be passed to the handler. + * @return The Retry-After header + */ + public Header getRetryHeader() { return this.retryHeader;} +} diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandlerObserver.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandlerObserver.java new file mode 100644 index 00000000000..e4b4ba65449 --- /dev/null +++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandlerObserver.java @@ -0,0 +1,18 @@ +package com.microsoft.applicationinsights.internal.channel; + + +/** + * Enables the {@link TransmissionPolicyManager} to handle transmission states. + *

+ * This interface extends {@TransmissionHandler} to add the ability to observe when the transmission is completed. + * @author jamdavi + * + */ +public interface TransmissionHandlerObserver extends TransmissionHandler { + + /** + * Used to add a {@link TransmissionHandler} to the collection stored by the {@link TransmissionPolicyManager} + * @param handler The handler to add to the collection. + */ + void addTransmissionHandler(TransmissionHandler handler); +} diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmitterFactory.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmitterFactory.java index c18b8d16b0d..aa46694f96d 100644 --- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmitterFactory.java +++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmitterFactory.java @@ -25,5 +25,21 @@ * Created by gupele on 12/21/2014. */ public interface TransmitterFactory { + /** + * Creates the {@link TelemetriesTransmitter} for use by the {@link TelemetryChannel} + * @param endpoint HTTP Endpoint to send telemetry to + * @param maxTransmissionStorageCapacity Max amount of disk space in KB for persistent storage to use + * @param throttlingIsEnabled Allow the network telemetry sender to be throttled + * @return The {@link TelemetriesTransmitter} object + */ TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled); + /** + * Creates the {@link TelemetriesTransmitter} for use by the {@link TelemetryChannel} + * @param endpoint HTTP Endpoint to send telemetry to + * @param maxTransmissionStorageCapacity Max amount of disk space in KB for persistent storage to use + * @param throttlingIsEnabled Allow the network telemetry sender to be throttled + * @param maxInstantRetries Number of instant retries in case of a temporary network outage + * @return The {@link TelemetriesTransmitter} object + */ + TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled, int maxInstantRetries); } diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ActiveTransmissionLoader.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ActiveTransmissionLoader.java index e149d851a8c..bcd936e6edc 100644 --- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ActiveTransmissionLoader.java +++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ActiveTransmissionLoader.java @@ -26,13 +26,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.base.Preconditions; import com.microsoft.applicationinsights.internal.channel.TransmissionDispatcher; import com.microsoft.applicationinsights.internal.channel.TransmissionsLoader; import com.microsoft.applicationinsights.internal.logger.InternalLogger; -import com.google.common.base.Preconditions; -import com.microsoft.applicationinsights.internal.shutdown.Stoppable; - /** * The class is responsible for loading transmission files that were saved to the disk * @@ -108,7 +106,7 @@ public void run() { case UNBLOCKED: fetchNext(true); break; - + case BACKOFF: case BLOCKED_BUT_CAN_BE_PERSISTED: Thread.sleep(DEFAULT_SLEEP_INTERVAL_AFTER_DISPATCHING_IN_MILLS); break; diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/BackendResponse.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/BackendResponse.java new file mode 100644 index 00000000000..ac74ea10cad --- /dev/null +++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/BackendResponse.java @@ -0,0 +1,20 @@ +package com.microsoft.applicationinsights.internal.channel.common; + +/** + * Utility class used by the {@link PartialSuccessHandler} + * + * @author jamdavi + * + */ +class BackendResponse { + + int itemsReceived; + int itemsAccepted; + Error[] errors; + + class Error { + public int index; + public int statusCode; + public String message; + } +} diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ErrorHandler.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ErrorHandler.java new file mode 100644 index 00000000000..265e1266c78 --- /dev/null +++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ErrorHandler.java @@ -0,0 +1,70 @@ +package com.microsoft.applicationinsights.internal.channel.common; + +import com.microsoft.applicationinsights.internal.channel.TransmissionHandler; +import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs; +import com.microsoft.applicationinsights.internal.logger.InternalLogger; + +/** + * This class implements the retry logic for transmissions with the results of a + * 408, 500, and 503 result. + *

+ * It does not handle any error codes such as 400, 401, 403, 404, etc. + * + * @author jamdavi + * + */ +public class ErrorHandler implements TransmissionHandler { + + private TransmissionPolicyManager transmissionPolicyManager; + + /** + * Ctor + * + * Constructs the ErrorHandler object. + * + * @param policy + * The {@link TransmissionPolicyManager} object that is needed to + * control the back off policy + */ + public ErrorHandler(TransmissionPolicyManager policy) { + this.transmissionPolicyManager = policy; + } + + @Override + public void onTransmissionSent(TransmissionHandlerArgs args) { + + validateTransmissionAndSend(args); + } + + boolean validateTransmissionAndSend(TransmissionHandlerArgs args) { + if (args.getTransmission() != null && args.getTransmissionDispatcher() != null) { + args.getTransmission().incrementNumberOfSends(); + switch (args.getResponseCode()) { + case TransmissionSendResult.REQUEST_TIMEOUT: + case TransmissionSendResult.INTERNAL_SERVER_ERROR: + case TransmissionSendResult.SERVICE_UNAVAILABLE: + backoffAndSendTransmission(args); + return true; + default: + InternalLogger.INSTANCE.trace("Http response code %s not handled by %s", args.getResponseCode(), + this.getClass().getName()); + return false; + } + } else if (args.getException() != null) { + backoffAndSendTransmission(args); + return true; + } + return false; + } + + private void backoffAndSendTransmission(TransmissionHandlerArgs args) { + // It is possible for us to have a temporary blip in transmission + // this setting will allow us to control how many instant retries we perform + // before backing off the send + if (args.getTransmission() != null && (args.getTransmission().getNumberOfSends() > transmissionPolicyManager.getMaxInstantRetries())) + { + this.transmissionPolicyManager.backoff(); + } + args.getTransmissionDispatcher().dispatch(args.getTransmission()); + } +} diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/PartialSuccessHandler.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/PartialSuccessHandler.java new file mode 100644 index 00000000000..da1d7ae074a --- /dev/null +++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/PartialSuccessHandler.java @@ -0,0 +1,192 @@ +package com.microsoft.applicationinsights.internal.channel.common; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.GZIPInputStream; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.http.HttpStatus; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.common.base.Optional; +import com.microsoft.applicationinsights.internal.channel.TransmissionHandler; +import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs; +import com.microsoft.applicationinsights.internal.logger.InternalLogger; + +/** + * This class implements the retry logic for partially accepted transmissions. + * HTTP status code 206. + *

+ * + * @see PartialSuccessTransmissionPolicy + * @author jamdavi + * + */ +public class PartialSuccessHandler implements TransmissionHandler { + + /** + * Ctor + * + * Constructs the PartialSuccessHandler object. + * + * @param policy + * The {@link TransmissionPolicyManager} object that is needed to + * control the back off policy. + */ + public PartialSuccessHandler(TransmissionPolicyManager policy) { + } + + @Override + public void onTransmissionSent(TransmissionHandlerArgs args) { + validateTransmissionAndSend(args); + } + + /** + * Provides the core logic for the retransmission + * + * @param args + * The {@link TransmissionHandlerArgs} for this transmission. + * @return Returns a pass/fail for handling this transmission. + */ + boolean validateTransmissionAndSend(TransmissionHandlerArgs args) { + if (args.getTransmission() != null && args.getTransmissionDispatcher() != null) { + switch (args.getResponseCode()) { + case HttpStatus.SC_PARTIAL_CONTENT: + BackendResponse backendResponse = getBackendResponse(args.getResponseBody()); + List originalItems = generateOriginalItems(args); + + // Somehow the amount of items received and the items sent do not match + if (backendResponse != null && (originalItems.size() != backendResponse.itemsReceived)) { + InternalLogger.INSTANCE.trace( + "Skipping partial content handler due to itemsReceived being larger than the items sent."); + return false; + } + + if (backendResponse != null && (backendResponse.itemsAccepted < backendResponse.itemsReceived)) { + List newTransmission = new ArrayList(); + for (BackendResponse.Error e : backendResponse.errors) { + switch (e.statusCode) { + case TransmissionSendResult.REQUEST_TIMEOUT: + case TransmissionSendResult.INTERNAL_SERVER_ERROR: + case TransmissionSendResult.SERVICE_UNAVAILABLE: + case TransmissionSendResult.THROTTLED: + case TransmissionSendResult.THROTTLED_OVER_EXTENDED_TIME: + // Unknown condition where backend response returns an index greater than the + // items we're returning + if (e.index < originalItems.size()) { + newTransmission.add(originalItems.get(e.index)); + } + break; + } + } + return sendNewTransmission(args, newTransmission); + } + InternalLogger.INSTANCE + .trace("Skipping partial content handler due to itemsAccepted and itemsReceived being equal."); + return false; + + default: + InternalLogger.INSTANCE.trace("Http response code %s not handled by %s", args.getResponseCode(), + this.getClass().getName()); + return false; + } + } + return false; + } + + /** + * Used to parse the original telemetry request in order to resend the failed + * ones. + * + * @param args + * The {@link TransmissionHandlerArgs} that contains the + * {@link Transmission} object. + * @return A List<> of each sent item + */ + List generateOriginalItems(TransmissionHandlerArgs args) { + List originalItems = new ArrayList(); + + if (args.getTransmission().getWebContentEncodingType() == "gzip") { + + try { + GZIPInputStream gis = new GZIPInputStream( + new ByteArrayInputStream(args.getTransmission().getContent())); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gis)); + String line; + while ((line = bufferedReader.readLine()) != null) { + originalItems.add(line); + } + if (gis != null) { + gis.close(); + } + if (bufferedReader != null) { + bufferedReader.close(); + } + } catch (IOException e1) { + InternalLogger.INSTANCE.error("IOException: Error while reading the GZIP stream.%nStack Trace:%n%s", + ExceptionUtils.getStackTrace(e1)); + } catch (Throwable t) { + InternalLogger.INSTANCE.error("Error while reading the GZIP stream.%nStack Trace:%n%s", + ExceptionUtils.getStackTrace(t)); + } finally { + } + } else { + for (String s : new String(args.getTransmission().getContent()).split("\r\n")) { + originalItems.add(s); + } + } + return originalItems; + } + + /** + * Sends a new transmission generated from the failed attempts from the original + * request. + * + * @param args + * The {@link TransmissionHandlerArgs} object that contains the + * {@link TransmissionDispatcher} + * @param newTransmission + * The {@link List} of items to resent + * @return A pass/fail response + */ + boolean sendNewTransmission(TransmissionHandlerArgs args, List newTransmission) { + if (!newTransmission.isEmpty()) { + GzipTelemetrySerializer serializer = new GzipTelemetrySerializer(); + Optional newT = serializer.serialize(newTransmission); + args.getTransmissionDispatcher().dispatch(newT.get()); + return true; + } + return false; + } + + /** + * Helper method to parse the 206 response. Uses {@link Gson} + * + * @param response + * The body of the response. + * @return A {@link BackendResponse} object that contains the status of the + * partial success. + */ + private BackendResponse getBackendResponse(String response) { + + BackendResponse backend = null; + try { + // Parse JSON to Java + GsonBuilder gsonBuilder = new GsonBuilder(); + Gson gson = gsonBuilder.create(); + backend = gson.fromJson(response, BackendResponse.class); + } catch (Throwable t) { + InternalLogger.INSTANCE.trace( + "Error deserializing backend response with Gson.%nStack Trace:%n%s", + ExceptionUtils.getStackTrace(t)); + } finally { + } + return backend; + } +} diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/SenderThreadLocalBackOffData.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/SenderThreadLocalBackOffData.java index 96c24497b13..3beda0bce01 100644 --- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/SenderThreadLocalBackOffData.java +++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/SenderThreadLocalBackOffData.java @@ -68,7 +68,7 @@ public SenderThreadLocalBackOffData(long[] backOffTimeoutsInMillis, long addMill public boolean isTryingToSend() { return currentBackOffIndex != -1; } - + /** * This method should be called by the Sender thread when the * Transmission is considered as 'done sending', which means the @@ -92,34 +92,68 @@ public void onDoneSending() { * 4. The thread has exhausted all the back-off timeouts */ public boolean backOff() { - try { - lock.lock(); - ++currentBackOffIndex; - if (currentBackOffIndex == backOffTimeoutsInMillis.length) { - currentBackOffIndex = -1; - - // Exhausted the back-offs - return false; - } + try { + lock.lock(); + ++currentBackOffIndex; + if (currentBackOffIndex == backOffTimeoutsInMillis.length) { + currentBackOffIndex = -1; - if (!instanceIsActive) { + // Exhausted the back-offs return false; } - try { - long millisecondsToWait = backOffTimeoutsInMillis[currentBackOffIndex]; - if (millisecondsToWait > BackOffTimesPolicy.MIN_TIME_TO_BACK_OFF_IN_MILLS) { - millisecondsToWait += addMilliseconds; - } - backOffCondition.await(millisecondsToWait, TimeUnit.MILLISECONDS); - return instanceIsActive; - } catch (InterruptedException e) { - return false; + if (!instanceIsActive) { + return false; + } + + try { + long millisecondsToWait = backOffTimeoutsInMillis[currentBackOffIndex]; + if (millisecondsToWait > BackOffTimesPolicy.MIN_TIME_TO_BACK_OFF_IN_MILLS) { + millisecondsToWait += addMilliseconds; + } + backOffCondition.await(millisecondsToWait, TimeUnit.MILLISECONDS); + return instanceIsActive; + } catch (InterruptedException e) { + return false; + } + } finally { + lock.unlock(); + } + } + + /** + * Increment the current back off amount or resets the counter if needed. + *

+ * This method does not block but instead provides the amount of time to sleep which can be used + * in another method. + * @return The number of milliseconds to sleep for. + */ + public long backOffTimerValue() { + try { + lock.lock(); + ++currentBackOffIndex; + if (currentBackOffIndex == backOffTimeoutsInMillis.length) { + currentBackOffIndex = -1; + + // Exhausted the back-offs + return -1; } - } finally { - lock.unlock(); - } - } + + if (!instanceIsActive) { + return 0; + } + + + long millisecondsToWait = backOffTimeoutsInMillis[currentBackOffIndex]; + if (millisecondsToWait > BackOffTimesPolicy.MIN_TIME_TO_BACK_OFF_IN_MILLS) { + millisecondsToWait += addMilliseconds; + } + return millisecondsToWait; + + } finally { + lock.unlock(); + } + } /** * Stop a waiting thread if there is one, and prevent that thread for backOffing. diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/SenderThreadsBackOffManager.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/SenderThreadsBackOffManager.java index b13eb614606..a8e758b23bb 100644 --- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/SenderThreadsBackOffManager.java +++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/SenderThreadsBackOffManager.java @@ -63,6 +63,11 @@ public void onDoneSending() { SenderThreadLocalBackOffData currentThreadData = this.get(); currentThreadData.onDoneSending(); } + + public long backOffCurrentSenderThreadValue() { + SenderThreadLocalBackOffData currentThreadData = this.get(); + return currentThreadData.backOffTimerValue(); + } public boolean backOffCurrentSenderThread() { SenderThreadLocalBackOffData currentThreadData = this.get(); @@ -75,7 +80,7 @@ public synchronized void stopAllSendersBackOffActivities() { } stopped = true; } - + @Override protected SenderThreadLocalBackOffData initialValue() { int addSeconds = threadsSecondsDifference.incrementAndGet(); diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ThrottlingHandler.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ThrottlingHandler.java new file mode 100644 index 00000000000..4af52a641d8 --- /dev/null +++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ThrottlingHandler.java @@ -0,0 +1,135 @@ +package com.microsoft.applicationinsights.internal.channel.common; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.TimeZone; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.http.Header; + +import com.google.common.base.Strings; +import com.microsoft.applicationinsights.internal.channel.TransmissionHandler; +import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs; +import com.microsoft.applicationinsights.internal.logger.InternalLogger; + +/** + * This class implements the retry logic for throttled requests. HTTP status + * code 429 and 439. + * + * @author jamdavi + * + */ +public class ThrottlingHandler implements TransmissionHandler { + + private TransmissionPolicyManager transmissionPolicyManager; + private final static String RESPONSE_RETRY_AFTER_DATE_FORMAT = "E, dd MMM yyyy HH:mm:ss"; + + /** + * Ctor + * + * Constructs the ThrottlingHandler object. + * + * @param policy + * The {@link TransmissionPolicyManager} object that is needed to + * control the back off policy. + */ + public ThrottlingHandler(TransmissionPolicyManager policy) { + this.transmissionPolicyManager = policy; + } + + @Override + public void onTransmissionSent(TransmissionHandlerArgs args) { + validateTransmissionAndSend(args); + } + + /** + * Provides the core logic for the retransmission + * + * @param args + * The {@link TransmissionHandlerArgs} for this transmission. + * @return Returns a pass/fail for handling this transmission. + */ + boolean validateTransmissionAndSend(TransmissionHandlerArgs args) { + if (args.getRetryHeader() != null && args.getTransmission() != null + && args.getTransmissionDispatcher() != null) { + args.getTransmission().incrementNumberOfSends(); + switch (args.getResponseCode()) { + case TransmissionSendResult.THROTTLED: + case TransmissionSendResult.THROTTLED_OVER_EXTENDED_TIME: + suspendTransmissions(TransmissionPolicy.BLOCKED_BUT_CAN_BE_PERSISTED, args.getRetryHeader()); + args.getTransmissionDispatcher().dispatch(args.getTransmission()); + return true; + default: + InternalLogger.INSTANCE.trace("Http response code %s not handled by %s", args.getResponseCode(), + this.getClass().getName()); + return false; + } + } + InternalLogger.INSTANCE.trace("Http response code %s not handled by %s.", args.getResponseCode(), + this.getClass().getName()); + return false; + } + + /** + * Used to put the sender thread to sleep for the specified duration in the + * Retry-After header. + * + * @param suspensionPolicy + * The policy used to suspend the threads. For now we use + * {@link TransmissionPolicy.BLOCKED_BUT_CAN_BE_PERSISTED} for reuse + * of the existing logic. + * @param retryAfterHeader + * The header that is captured from the HTTP response. + */ + private void suspendTransmissions(TransmissionPolicy suspensionPolicy, Header retryAfterHeader) { + + if (retryAfterHeader == null) { + return; + } + String retryAfterAsString = retryAfterHeader.getValue(); + if (Strings.isNullOrEmpty(retryAfterAsString)) { + return; + } + + try { + DateFormat formatter = new SimpleDateFormat(RESPONSE_RETRY_AFTER_DATE_FORMAT); + Date date = formatter.parse(retryAfterAsString); + + Date now = Calendar.getInstance().getTime(); + long retryAfterAsSeconds = (date.getTime() - convertToDateToGmt(now).getTime()) / 1000; + this.transmissionPolicyManager.suspendInSeconds(suspensionPolicy, retryAfterAsSeconds); + } catch (Throwable e) { + InternalLogger.INSTANCE.error("Throttled but failed to block transmission.%nStack Trace:%n%s", + ExceptionUtils.getStackTrace(e)); + this.transmissionPolicyManager.backoff(); + } + + } + + /** + * Converts parsed date value to GMT for the {@link suspendTransmissions} + * method. + * + * @param date + * The date to convert to GMT + * @return The corrected date. + */ + private static Date convertToDateToGmt(Date date) { + TimeZone tz = TimeZone.getDefault(); + Date ret = new Date(date.getTime() - tz.getRawOffset()); + + // If we are now in DST, back off by the delta. Note that we are checking the + // GMT date, this is the KEY. + if (tz.inDaylightTime(ret)) { + Date dstDate = new Date(ret.getTime() - tz.getDSTSavings()); + + // Check to make sure we have not crossed back into standard time + if (tz.inDaylightTime(dstDate)) { + ret = dstDate; + } + } + return ret; + } +} diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionFileSystemOutput.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionFileSystemOutput.java index f6da9b9bd0e..e113a2988bd 100644 --- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionFileSystemOutput.java +++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionFileSystemOutput.java @@ -78,7 +78,7 @@ public final class TransmissionFileSystemOutput implements TransmissionOutput { private final static int DELETE_TIMEOUT_ON_FAILURE_IN_MILLS = 100; private final static int DEFAULT_CAPACITY_MEGABYTES = 10; - private final static int MAX_CAPACITY_MEGABYTES = 100; + private final static int MAX_CAPACITY_MEGABYTES = 1000; private final static int MIN_CAPACITY_MEGABYTES = 1; private static final String MAX_TRANSMISSION_STORAGE_CAPACITY_NAME = "Channel.MaxTransmissionStorageCapacityInMB"; @@ -135,6 +135,7 @@ public TransmissionFileSystemOutput(String folderPath) { @Override public boolean send(Transmission transmission) { if (size.get() >= capacityInKB) { + InternalLogger.INSTANCE.logAlways(InternalLogger.LoggingLevel.WARN, "Persistent storage max capcity has been reached; currently at %s KB. Telemetry will be lost, please set the MaxTransmissionStorageFilesCapacityInMB property in the configuration file.", size.get()); return false; } @@ -151,6 +152,7 @@ public boolean send(Transmission transmission) { return false; } + InternalLogger.INSTANCE.logAlways(InternalLogger.LoggingLevel.TRACE, "Persistent storage saved permanent file; data will be persisted and sent when the network is available."); return true; } diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionNetworkOutput.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionNetworkOutput.java index 2247c6356bf..8f7bd26028d 100644 --- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionNetworkOutput.java +++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionNetworkOutput.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.microsoft.applicationinsights.internal.channel.TransmissionDispatcher; +import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs; import com.microsoft.applicationinsights.internal.channel.TransmissionOutput; import com.microsoft.applicationinsights.internal.logger.InternalLogger; import org.apache.http.Header; @@ -33,294 +34,241 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.conn.ConnectionPoolTimeoutException; import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.util.EntityUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.net.SocketException; import java.net.UnknownHostException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.Date; -import java.util.TimeZone; import java.util.concurrent.TimeUnit; /** - * The class is responsible for the actual sending of {@link com.microsoft.applicationinsights.internal.channel.common.Transmission} + * The class is responsible for the actual sending of + * {@link com.microsoft.applicationinsights.internal.channel.common.Transmission} * * The class uses Apache's HttpClient framework for that. * * Created by gupele on 12/18/2014. */ public final class TransmissionNetworkOutput implements TransmissionOutput { - private final static String CONTENT_TYPE_HEADER = "Content-Type"; - private final static String CONTENT_ENCODING_HEADER = "Content-Encoding"; - private final static String RESPONSE_THROTTLING_HEADER = "Retry-After"; - private final static String RESPONSE_RETRY_AFTER_DATE_FORMAT = "E, dd MMM yyyy HH:mm:ss"; + private static final int MAX_RESEND = 3; + private final static String CONTENT_TYPE_HEADER = "Content-Type"; + private final static String CONTENT_ENCODING_HEADER = "Content-Encoding"; + private final static String RESPONSE_THROTTLING_HEADER = "Retry-After"; + + private final static String DEFAULT_SERVER_URI = "https://dc.services.visualstudio.com/v2/track"; + + // For future use: re-send a failed transmission back to the dispatcher + private TransmissionDispatcher transmissionDispatcher; + + private final String serverUri; + + private volatile boolean stopped; + + // Use one instance for optimization + private final ApacheSender httpClient; + + private TransmissionPolicyManager transmissionPolicyManager; + + /** + * Creates an instance of the network transmission class. + *

+ * Will use the DEFAULT_SERVER_URI for the endpoint. + * + * @param transmissionPolicyManager + * The transmission policy used to mark this sender active or + * blocked. + * @return + */ + public static TransmissionNetworkOutput create(TransmissionPolicyManager transmissionPolicyManager) { + return create(DEFAULT_SERVER_URI, transmissionPolicyManager); + } + + /** + * Creates an instance of the network transmission class. + * + * @param endpoint + * The HTTP endpoint to send our telemetry too. + * @param transmissionPolicyManager + * The transmission policy used to mark this sender active or + * blocked. + * @return + */ + public static TransmissionNetworkOutput create(String endpoint, + TransmissionPolicyManager transmissionPolicyManager) { + String realEndpoint = Strings.isNullOrEmpty(endpoint) ? DEFAULT_SERVER_URI : endpoint; + return new TransmissionNetworkOutput(realEndpoint, transmissionPolicyManager); + } + + /** + * Private Ctor to initialize class. + *

+ * Also creates the httpClient using the ApacheSender instance + * + * @param serverUri + * The HTTP endpoint to send our telemetry too. + * @param transmissionPolicyManager + */ + private TransmissionNetworkOutput(String serverUri, TransmissionPolicyManager transmissionPolicyManager) { + Preconditions.checkNotNull(serverUri, "serverUri should be a valid non-null value"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(serverUri), "serverUri should be a valid non-null value"); + Preconditions.checkNotNull(transmissionPolicyManager, + "transmissionPolicyManager should be a valid non-null value"); + + this.serverUri = serverUri; + + httpClient = ApacheSenderFactory.INSTANCE.create(); + this.transmissionPolicyManager = transmissionPolicyManager; + stopped = false; + + } + + /** + * Used to inject the dispatcher used for this output so it can be injected to + * the retry logic. + * + * @param transmissionDispatcher + * The dispatcher to be injected. + */ + public void setTransmissionDispatcher(TransmissionDispatcher transmissionDispatcher) { + this.transmissionDispatcher = transmissionDispatcher; + } + + /** + * Stops all threads from sending data. + * + * @param timeout + * The timeout to wait, which is not relevant here. + * @param timeUnit + * The time unit, which is not relevant in this method. + */ + @Override + public synchronized void stop(long timeout, TimeUnit timeUnit) { + if (stopped) { + return; + } + + httpClient.close(); + stopped = true; + } + + /** + * Tries to send a + * {@link com.microsoft.applicationinsights.internal.channel.common.Transmission} + * The thread that calls that method might be suspended if there is a throttling + * issues, in any case the thread that enters this method is responsive for + * 'stop' request that might be issued by the application. + * + * @param transmission + * The data to send + * @return True when done. + */ + @Override + public boolean send(Transmission transmission) { + if (!stopped) { + // If we're not stopped but in a blocked state then fail to second + // TransmissionOutput + if (transmissionPolicyManager.getTransmissionPolicyState() + .getCurrentState() != TransmissionPolicy.UNBLOCKED) { + return false; + } + + HttpResponse response = null; + HttpPost request = null; + int code = 0; + String respString = null; + Throwable ex = null; + Header retryAfterHeader = null; + try { + // POST the transmission data to the endpoint + request = createTransmissionPostRequest(transmission); + httpClient.enhanceRequest(request); + response = httpClient.sendPostRequest(request); + HttpEntity respEntity = response.getEntity(); + code = response.getStatusLine().getStatusCode(); + respString = EntityUtils.toString(respEntity); + retryAfterHeader = response.getFirstHeader(RESPONSE_THROTTLING_HEADER); + + // After the third time through this dispatcher we should reset the counter and + // then fail to second TransmissionOutput + if (code > HttpStatus.SC_PARTIAL_CONTENT && transmission.getNumberOfSends() >= MAX_RESEND) { + transmission.setNumberOfSends(0); + return false; + } else if (code == HttpStatus.SC_OK) { + // If we've completed then clear the back off flags as the channel does not need + // to be throttled + transmissionPolicyManager.clearBackoff(); + } + return true; + + } catch (ConnectionPoolTimeoutException e) { + ex = e; + InternalLogger.INSTANCE.error("Failed to send, connection pool timeout exception%nStack Trace:%n%s", + ExceptionUtils.getStackTrace(e)); + } catch (SocketException e) { + ex = e; + InternalLogger.INSTANCE.error("Failed to send, socket exception.%nStack Trace:%n%s", + ExceptionUtils.getStackTrace(e)); + } catch (UnknownHostException e) { + ex = e; + InternalLogger.INSTANCE.error( + "Failed to send, wrong host address or cannot reach address due to network issues.%nStack Trace:%n%s", + ExceptionUtils.getStackTrace(e)); + } catch (IOException ioe) { + ex = ioe; + InternalLogger.INSTANCE.error("Failed to send.%nStack Trace:%n%s", ExceptionUtils.getStackTrace(ioe)); + } catch (Exception e) { + ex = e; + InternalLogger.INSTANCE.error("Failed to send, unexpected exception.%nStack Trace:%n%s", + ExceptionUtils.getStackTrace(e)); + } catch (Throwable t) { + ex = t; + InternalLogger.INSTANCE.error("Failed to send, unexpected error.%nStack Trace:%n%s", + ExceptionUtils.getStackTrace(t)); + } finally { + if (request != null) { + request.releaseConnection(); + } + httpClient.dispose(response); + + if (code != HttpStatus.SC_OK && transmission.getNumberOfSends() < MAX_RESEND) { + // Invoke the listeners for handling things like errors + // The listeners will handle the back off logic as well as the dispatch + // operation + TransmissionHandlerArgs args = new TransmissionHandlerArgs(); + args.setTransmission(transmission); + args.setTransmissionDispatcher(transmissionDispatcher); + args.setResponseBody(respString); + args.setResponseCode(code); + args.setException(ex); + args.setRetryHeader(retryAfterHeader); + this.transmissionPolicyManager.onTransmissionSent(args); + } + } + } + // If we end up here we've hit an error code we do not expect (403, 401, 400, + // etc.) + // This also means that unless there is a TransmissionHandler for this code we + // will not retry. + return true; + } + + /** + * Generates the HTTP POST to send to the endpoint. + * + * @param transmission + * The transmission to send. + * @return The completed {@link HttpPost} object + */ + private HttpPost createTransmissionPostRequest(Transmission transmission) { + HttpPost request = new HttpPost(serverUri); + request.addHeader(CONTENT_TYPE_HEADER, transmission.getWebContentType()); + request.addHeader(CONTENT_ENCODING_HEADER, transmission.getWebContentEncodingType()); + + ByteArrayEntity bae = new ByteArrayEntity(transmission.getContent()); + request.setEntity(bae); + + return request; + } - private final static String DEFAULT_SERVER_URI = "https://dc.services.visualstudio.com/v2/track"; - private final static int DEFAULT_BACKOFF_TIME_SECONDS = 300; - - // For future use: re-send a failed transmission back to the dispatcher - private TransmissionDispatcher transmissionDispatcher; - - private final String serverUri; - - private volatile boolean stopped; - - // Use one instance for optimization - private final ApacheSender httpClient; - - private TransmissionPolicyManager transmissionPolicyManager; - - public static TransmissionNetworkOutput create(TransmissionPolicyManager transmissionPolicyManager) { - return create(DEFAULT_SERVER_URI, transmissionPolicyManager); - } - - public static TransmissionNetworkOutput create(String endpoint, TransmissionPolicyManager transmissionPolicyManager) { - String realEndpoint = Strings.isNullOrEmpty(endpoint) ? DEFAULT_SERVER_URI : endpoint; - return new TransmissionNetworkOutput(realEndpoint, transmissionPolicyManager); - } - - private TransmissionNetworkOutput(String serverUri, TransmissionPolicyManager transmissionPolicyManager) { - Preconditions.checkNotNull(serverUri, "serverUri should be a valid non-null value"); - Preconditions.checkArgument(!Strings.isNullOrEmpty(serverUri), "serverUri should be a valid non-null value"); - Preconditions.checkNotNull(transmissionPolicyManager, "transmissionPolicyManager should be a valid non-null value"); - - this.serverUri = serverUri; - - httpClient = ApacheSenderFactory.INSTANCE.create(); - this.transmissionPolicyManager = transmissionPolicyManager; - stopped = false; - } - - public void setTransmissionDispatcher(TransmissionDispatcher transmissionDispatcher) { - this.transmissionDispatcher = transmissionDispatcher; - } - - /** - * Stops all threads from sending data. - * @param timeout The timeout to wait, which is not relevant here. - * @param timeUnit The time unit, which is not relevant in this method. - */ - @Override - public synchronized void stop(long timeout, TimeUnit timeUnit) { - if (stopped) { - return; - } - - httpClient.close(); - stopped = true; - } - - /** - * Tries to send a {@link com.microsoft.applicationinsights.internal.channel.common.Transmission} - * The thread that calls that method might be suspended if there is a throttling issues, in any case - * the thread that enters this method is responsive for 'stop' request that might be issued by the application. - * @param transmission The data to send - * @return True when done. - */ - @Override - public boolean send(Transmission transmission) { - while (!stopped) { - if (transmissionPolicyManager.getTransmissionPolicyState().getCurrentState() != TransmissionPolicy.UNBLOCKED) { - return false; - } - - HttpResponse response = null; - HttpPost request = null; - boolean shouldBackoff = false; - try { - request = createTransmissionPostRequest(transmission); - httpClient.enhanceRequest(request); - - response = httpClient.sendPostRequest(request); - - HttpEntity respEntity = response.getEntity(); - int code = response.getStatusLine().getStatusCode(); - - TransmissionSendResult sendResult = translateResponse(code, respEntity); - switch (sendResult) { - case PAYMENT_REQUIRED: - case THROTTLED: - suspendTransmissions(TransmissionPolicy.BLOCKED_BUT_CAN_BE_PERSISTED, response); - break; - - case THROTTLED_OVER_EXTENDED_TIME: - suspendTransmissions(TransmissionPolicy.BLOCKED_AND_CANNOT_BE_PERSISTED, response); - break; - - default: - return true; - } - } catch (ConnectionPoolTimeoutException e) { - InternalLogger.INSTANCE.error("Failed to send, connection pool timeout exception"); - shouldBackoff = true; - } catch (SocketException e) { - InternalLogger.INSTANCE.error("Failed to send, socket timeout exception"); - shouldBackoff = true; - } catch (UnknownHostException e) { - InternalLogger.INSTANCE.error("Failed to send, wrong host address or cannot reach address due to network issues, exception: %s", e.getMessage()); - shouldBackoff = true; - } catch (IOException ioe) { - InternalLogger.INSTANCE.error("Failed to send, exception: %s", ioe.getMessage()); - shouldBackoff = true; - } catch (Exception e) { - InternalLogger.INSTANCE.error("Failed to send, unexpected exception: %s", e.getMessage()); - shouldBackoff = true; - } catch (Throwable t) { - InternalLogger.INSTANCE.error("Failed to send, unexpected error: %s", t.getMessage()); - shouldBackoff = true; - } - finally { - if (request != null) { - request.releaseConnection(); - } - httpClient.dispose(response); - // backoff before trying again - if (shouldBackoff) { - InternalLogger.INSTANCE.trace("Backing off for %s seconds", DEFAULT_BACKOFF_TIME_SECONDS); - transmissionPolicyManager.suspendInSeconds(TransmissionPolicy.BLOCKED_BUT_CAN_BE_PERSISTED, DEFAULT_BACKOFF_TIME_SECONDS); - } - } - } - - return true; - } - - private void suspendTransmissions(TransmissionPolicy suspensionPolicy, HttpResponse response) { - Header retryAfterHeader = response.getFirstHeader(RESPONSE_THROTTLING_HEADER); - if (retryAfterHeader == null) { - return; - } - - String retryAfterAsString = retryAfterHeader.getValue(); - if (Strings.isNullOrEmpty(retryAfterAsString)) { - return; - } - - try { - DateFormat formatter = new SimpleDateFormat(RESPONSE_RETRY_AFTER_DATE_FORMAT); - Date date = formatter.parse(retryAfterAsString); - - Date now = Calendar.getInstance().getTime(); - long retryAfterAsSeconds = (date.getTime() - convertToDateToGmt(now).getTime())/1000; - transmissionPolicyManager.suspendInSeconds(suspensionPolicy, retryAfterAsSeconds); - } catch (Throwable e) { - InternalLogger.INSTANCE.logAlways(InternalLogger.LoggingLevel.ERROR, "Throttled but failed to block transmission, exception: %s", e.getMessage()); - } - } - - private static Date convertToDateToGmt(Date date){ - TimeZone tz = TimeZone.getDefault(); - Date ret = new Date(date.getTime() - tz.getRawOffset()); - - // If we are now in DST, back off by the delta. Note that we are checking the GMT date, this is the KEY. - if (tz.inDaylightTime(ret)) { - Date dstDate = new Date(ret.getTime() - tz.getDSTSavings()); - - // Check to make sure we have not crossed back into standard time - if (tz.inDaylightTime(dstDate)) { - ret = dstDate; - } - } - return ret; - } - - private TransmissionSendResult translateResponse(int code, HttpEntity respEntity) { - if (code == HttpStatus.SC_OK) { - return TransmissionSendResult.SENT_SUCCESSFULLY; - } - - TransmissionSendResult result; - - String errorMessage; - if (code < HttpStatus.SC_OK || - (code >= HttpStatus.SC_MULTIPLE_CHOICES && code < HttpStatus.SC_BAD_REQUEST) || - code > HttpStatus.SC_INTERNAL_SERVER_ERROR) { - - errorMessage = String.format("Unexpected response code: %d", code); - result = TransmissionSendResult.REJECTED_BY_SERVER; - } else { - switch (code) { - case HttpStatus.SC_BAD_REQUEST: - errorMessage = "Bad request "; - result = TransmissionSendResult.BAD_REQUEST; - break; - - case 429: - result = TransmissionSendResult.THROTTLED; - errorMessage = "Throttling (All messages of the transmission were rejected) "; - break; - - case 439: - result = TransmissionSendResult.THROTTLED_OVER_EXTENDED_TIME; - errorMessage = "Throttling extended"; - break; - - case 402: - result = TransmissionSendResult.PAYMENT_REQUIRED; - errorMessage = "Throttling: payment required"; - break; - - case HttpStatus.SC_PARTIAL_CONTENT: - result = TransmissionSendResult.PARTIALLY_THROTTLED; - errorMessage = "Throttling (Partial messages of the transmission were rejected) "; - break; - - case HttpStatus.SC_INTERNAL_SERVER_ERROR: - errorMessage = "Internal server error "; - result = TransmissionSendResult.INTERNAL_SERVER_ERROR; - break; - - default: - result = TransmissionSendResult.REJECTED_BY_SERVER; - errorMessage = String.format("Error, response code: %d", code); - break; - } - } - - logError(errorMessage, respEntity); - return result; - } - - private void logError(String baseErrorMessage, HttpEntity respEntity) { - if (respEntity == null || !InternalLogger.INSTANCE.isErrorEnabled()) { - InternalLogger.INSTANCE.error(baseErrorMessage); - return; - } - - InputStream inputStream = null; - try { - inputStream = respEntity.getContent(); - InputStreamReader streamReader = new InputStreamReader(inputStream, "UTF-8"); - BufferedReader reader = new BufferedReader(streamReader); - String responseLine = reader.readLine(); - respEntity.getContent().close(); - - InternalLogger.INSTANCE.error("Failed to send, %s : %s", baseErrorMessage, responseLine); - } catch (IOException e) { - InternalLogger.INSTANCE.error("Failed to send, %s, failed to log the error", baseErrorMessage); - } finally { - if (inputStream != null) { - try { - inputStream.close(); - } catch (IOException e) { - } - } - } - } - - private HttpPost createTransmissionPostRequest(Transmission transmission) { - HttpPost request = new HttpPost(serverUri); - request.addHeader(CONTENT_TYPE_HEADER, transmission.getWebContentType()); - request.addHeader(CONTENT_ENCODING_HEADER, transmission.getWebContentEncodingType()); - - ByteArrayEntity bae = new ByteArrayEntity(transmission.getContent()); - request.setEntity(bae); - - return request; - } } diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionPolicy.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionPolicy.java index b655e933345..6e31644a139 100644 --- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionPolicy.java +++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionPolicy.java @@ -27,5 +27,6 @@ enum TransmissionPolicy { UNBLOCKED, BLOCKED_BUT_CAN_BE_PERSISTED, - BLOCKED_AND_CANNOT_BE_PERSISTED + BLOCKED_AND_CANNOT_BE_PERSISTED, + BACKOFF } diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionPolicyManager.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionPolicyManager.java index 473e6ebe872..8f847fad330 100644 --- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionPolicyManager.java +++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionPolicyManager.java @@ -21,20 +21,24 @@ package com.microsoft.applicationinsights.internal.channel.common; +import java.util.ArrayList; import java.util.Calendar; import java.util.Date; +import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.base.Preconditions; +import com.microsoft.applicationinsights.internal.channel.TransmissionHandler; +import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs; +import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerObserver; import com.microsoft.applicationinsights.internal.logger.InternalLogger; import com.microsoft.applicationinsights.internal.shutdown.SDKShutdownActivity; import com.microsoft.applicationinsights.internal.shutdown.Stoppable; import com.microsoft.applicationinsights.internal.util.ThreadPoolUtils; -import com.google.common.base.Preconditions; - /** * This class is responsible for managing the transmission state. * @@ -46,8 +50,17 @@ * * Created by gupele on 6/29/2015. */ -public final class TransmissionPolicyManager implements Stoppable { - +public final class TransmissionPolicyManager implements Stoppable, TransmissionHandlerObserver { + + private int INSTANT_RETRY_AMOUNT = 3; // Should always be set by the creator of this class + private int INSTANT_RETRY_MAX = 10; // Stops us from getting into an endless loop + + // Current thread backoff manager + private SenderThreadsBackOffManager backoffManager; + + // List of transmission policies implemented as handlers + private List transmissionHandlers; + // The future date the the transmission is blocked private Date suspensionDate; @@ -80,11 +93,45 @@ public void run() { } } + /** + * Create the {@link TransmissionPolicyManager} and set the ability to throttle. + * @param throttlingIsEnabled Set whether the {@link TransmissionPolicyManager} can be throttled. + */ public TransmissionPolicyManager(boolean throttlingIsEnabled) { suspensionDate = null; this.throttlingIsEnabled = throttlingIsEnabled; + this.transmissionHandlers = new ArrayList(); + this.backoffManager = new SenderThreadsBackOffManager(new ExponentialBackOffTimesPolicy()); } + /** + * Suspend the transmission thread according to the current back off policy. + */ + public void backoff() { + policyState.setCurrentState(TransmissionPolicy.BACKOFF); + long backOffMillis = backoffManager.backOffCurrentSenderThreadValue(); + if (backOffMillis > 0) + { + long backOffSeconds = backOffMillis / 1000; + InternalLogger.INSTANCE.logAlways(InternalLogger.LoggingLevel.TRACE, "App is throttled, telemetry will be blocked for %s seconds.", backOffSeconds); + this.suspendInSeconds(TransmissionPolicy.BACKOFF, backOffSeconds); + } + } + + /** + * Clear the current thread state and and reset the back off counter. + */ + public void clearBackoff() { + policyState.setCurrentState(TransmissionPolicy.UNBLOCKED); + backoffManager.onDoneSending(); + InternalLogger.INSTANCE.logAlways(InternalLogger.LoggingLevel.TRACE, "Backoff has been reset."); + } + + /** + * Suspend this transmission thread using the specified policy + * @param policy The {@link TransmissionPolicy} to use for suspension + * @param suspendInSeconds The number of seconds to suspend. + */ public void suspendInSeconds(TransmissionPolicy policy, long suspendInSeconds) { if (!throttlingIsEnabled) { return; @@ -97,11 +144,18 @@ public void suspendInSeconds(TransmissionPolicy policy, long suspendInSeconds) { doSuspend(policy, suspendInSeconds); } + /** + * Stop this transmission thread from sending. + */ @Override public synchronized void stop(long timeout, TimeUnit timeUnit) { ThreadPoolUtils.stop(threads, timeout, timeUnit); } + /** + * Get the policy state fetcher + * @return A {@link TransmissionPolicyStateFetcher} object + */ public TransmissionPolicyStateFetcher getTransmissionPolicyState() { return policyState; } @@ -111,7 +165,7 @@ private synchronized void doSuspend(TransmissionPolicy policy, long suspendInSec if (policy == TransmissionPolicy.UNBLOCKED ) { return; } - + Date date = Calendar.getInstance().getTime(); date.setTime(date.getTime() + 1000 * suspendInSeconds); if (this.suspensionDate != null) { @@ -160,4 +214,36 @@ public Thread newThread(Runnable r) { SDKShutdownActivity.INSTANCE.register(this); } + + @Override + public void onTransmissionSent(TransmissionHandlerArgs transmissionArgs) { + for (TransmissionHandler handler : this.transmissionHandlers) { + handler.onTransmissionSent(transmissionArgs); + } + } + + @Override + public void addTransmissionHandler(TransmissionHandler handler) { + if(handler != null) { + this.transmissionHandlers.add(handler); + } + } + + /** + * Set the number of retries before performing a back off operation. + * @param maxInstantRetries Number of retries + */ + public void setMaxInstantRetries(int maxInstantRetries) { + if (maxInstantRetries >= 0 && maxInstantRetries < INSTANT_RETRY_MAX) { + INSTANT_RETRY_AMOUNT = maxInstantRetries; + } + } + + /** + * Get the number of retries before performing a back off operation. + * @return Number of retries + */ + public int getMaxInstantRetries() { + return INSTANT_RETRY_AMOUNT; + } } diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionSendResult.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionSendResult.java index 6a66f8dc755..f87e5595a4a 100644 --- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionSendResult.java +++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionSendResult.java @@ -24,26 +24,12 @@ /** * Created by gupele on 2/10/2015. */ -enum TransmissionSendResult { - SENT_SUCCESSFULLY, - - FAILED_TO_SEND_DUE_TO_NETWORK_ISSUES, - FAILED_TO_SEND_DUE_TO_CONNECTION_POOL, - - FAILED_TO_RECEIVE_DUE_TO_TIMEOUT, - - FAILED_TO_READ_RESPONSE, - - // Got response - BAD_REQUEST, - INTERNAL_SERVER_ERROR, - - THROTTLED, - THROTTLED_OVER_EXTENDED_TIME, - PAYMENT_REQUIRED, - - PARTIALLY_THROTTLED, - - REJECTED_BY_SERVER, - UNKNOWN_ERROR, +final class TransmissionSendResult { + public final static int SENT_SUCCESSFULLY = 200; + public final static int PARTIAL_SUCCESS = 206; + public final static int REQUEST_TIMEOUT = 408; + public final static int THROTTLED = 429; + public final static int THROTTLED_OVER_EXTENDED_TIME = 439; + public final static int INTERNAL_SERVER_ERROR = 500; + public final static int SERVICE_UNAVAILABLE = 503; } diff --git a/core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/Constants.java b/core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/ConstantsTest.java similarity index 98% rename from core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/Constants.java rename to core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/ConstantsTest.java index 27ae7f7d3bb..421d682e0a8 100644 --- a/core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/Constants.java +++ b/core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/ConstantsTest.java @@ -24,6 +24,6 @@ /** * Created by yonisha on 8/2/2015. */ -public class Constants { +public class ConstantsTest { public static final String CONTEXT_FILE_PATTERN = "Docker host=%s,Docker image=%s,Docker container name=%s,Docker container id=%s"; } diff --git a/core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/DockerContextInitializerTests.java b/core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/DockerContextInitializerTests.java index a5f788938b6..f11e7d8fbdb 100644 --- a/core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/DockerContextInitializerTests.java +++ b/core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/DockerContextInitializerTests.java @@ -56,7 +56,7 @@ public class DockerContextInitializerTests { @BeforeClass public static void classInit() throws Exception { - String json = String.format(com.microsoft.applicationinsights.extensibility.initializer.docker.Constants.CONTEXT_FILE_PATTERN, DEFAULT_HOST, DEFAULT_IMAGE, DEFAULT_CONTAINER_NAME, DEFAULT_CONTAINER_ID); + String json = String.format(com.microsoft.applicationinsights.extensibility.initializer.docker.ConstantsTest.CONTEXT_FILE_PATTERN, DEFAULT_HOST, DEFAULT_IMAGE, DEFAULT_CONTAINER_NAME, DEFAULT_CONTAINER_ID); defaultDockerContext = new DockerContext(json); initializerUnderTest = new DockerContextInitializer(fileFactoryMock, contextPollerMock); } diff --git a/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/ErrorHandlerTest.java b/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/ErrorHandlerTest.java new file mode 100644 index 00000000000..aa74faa7acd --- /dev/null +++ b/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/ErrorHandlerTest.java @@ -0,0 +1,107 @@ +package com.microsoft.applicationinsights.internal.channel.common; + + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import com.microsoft.applicationinsights.internal.channel.TransmissionDispatcher; +import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs; +import com.microsoft.applicationinsights.internal.channel.common.ErrorHandler; +import com.microsoft.applicationinsights.internal.channel.common.Transmission; +import com.microsoft.applicationinsights.internal.channel.common.TransmissionPolicyManager; + + +public class ErrorHandlerTest { + + + private boolean generateTransmissionWithStatusCode(int code) { + TransmissionPolicyManager tpm = new TransmissionPolicyManager(true); + TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class); + TransmissionHandlerArgs args = new TransmissionHandlerArgs(); + args.setResponseCode(code); + args.setTransmission(new Transmission(new byte[] { 0 }, "testcontent", "testencoding")); + args.setTransmissionDispatcher(mockedDispatcher); + ErrorHandler eh = new ErrorHandler(tpm); + boolean result = eh.validateTransmissionAndSend(args); + return result; + } + + @Test + public void failOnNull() { + TransmissionPolicyManager tpm = new TransmissionPolicyManager(true); + TransmissionHandlerArgs args = new TransmissionHandlerArgs(); + ErrorHandler eh = new ErrorHandler(tpm); + boolean result = eh.validateTransmissionAndSend(args); + Assert.assertFalse(result); + } + + @Test + public void fail200Status() { + boolean result = generateTransmissionWithStatusCode(200); + Assert.assertFalse(result); + } + + @Test + public void fail206Status() { + boolean result = generateTransmissionWithStatusCode(206); + Assert.assertFalse(result); + } + + @Test + public void fail400Status() { + boolean result = generateTransmissionWithStatusCode(400); + Assert.assertFalse(result); + } + + @Test + public void fail404Status() { + boolean result = generateTransmissionWithStatusCode(404); + Assert.assertFalse(result); + } + + @Test + public void fail429Status() { + boolean result = generateTransmissionWithStatusCode(429); + Assert.assertFalse(result); + } + + @Test + public void fail439Status() { + boolean result = generateTransmissionWithStatusCode(439); + Assert.assertFalse(result); + } + + @Test + public void pass408Status() { + boolean result = generateTransmissionWithStatusCode(408); + Assert.assertTrue(result); + } + + @Test + public void pass500Status() { + boolean result = generateTransmissionWithStatusCode(500); + Assert.assertTrue(result); + } + + @Test + public void pass503Status() { + boolean result = generateTransmissionWithStatusCode(503); + Assert.assertTrue(result); + } + + @Test + public void passException() { + TransmissionPolicyManager tpm = new TransmissionPolicyManager(true); + TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class); + TransmissionHandlerArgs args = new TransmissionHandlerArgs(); + args.setResponseCode(0); + args.setTransmission(null); + args.setTransmissionDispatcher(mockedDispatcher); + args.setException(new Exception("Mocked")); + ErrorHandler eh = new ErrorHandler(tpm); + boolean result = eh.validateTransmissionAndSend(args); + Assert.assertTrue(result); + } + +} diff --git a/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/PartialSuccessHandlerTest.java b/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/PartialSuccessHandlerTest.java new file mode 100644 index 00000000000..3731d41da46 --- /dev/null +++ b/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/PartialSuccessHandlerTest.java @@ -0,0 +1,266 @@ +package com.microsoft.applicationinsights.internal.channel.common; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import com.microsoft.applicationinsights.internal.channel.TransmissionDispatcher; +import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs; +import com.microsoft.applicationinsights.internal.channel.common.ErrorHandler; +import com.microsoft.applicationinsights.internal.channel.common.PartialSuccessHandler; +import com.microsoft.applicationinsights.internal.channel.common.Transmission; +import com.microsoft.applicationinsights.internal.channel.common.TransmissionPolicyManager; + +public class PartialSuccessHandlerTest { + + private final byte[] fourItems = new byte[] {31, -117, 8, 0, 0, 0, 0, 0, 0, 0, -19, -110, 77, 79, 2, 49, 16, -122, -17, 38, -2, 7, -45, -13, -74, 105, -69, -80, -127, -67, -111, -32, 1, 13, 23, 69, 60, 15, -69, 3, 84, 119, -37, 77, 91, 49, -124, -16, -33, 109, 23, 69, 49, -98, -12, 32, -121, -67, -50, -41, 59, -13, -50, -77, 35, 27, -76, 36, 23, 9, -47, 80, 35, -55, -55, 84, 21, -42, 56, -77, -12, 108, -44, 52, -107, 42, -64, 43, -93, 39, -38, -87, -43, -38, 59, -74, -56, -122, -112, 2, -49, 80, -10, -95, 39, -5, 11, 16, -48, -21, -31, 112, -71, -52, -106, 34, -19, 15, 36, -69, -34, -96, -10, 36, 33, 94, -75, -45, 36, 23, 3, -54, 37, 21, 98, 38, -78, -100, -53, 60, -51, -104, -112, -100, -14, 62, -25, -95, -54, 65, -35, 84, 120, 7, 62, -44, 10, -50, 25, 79, -120, -70, -59, 109, 104, -4, 16, -94, 81, -119, 70, 41, 26, -75, -24, 87, -79, 40, 3, 43, 71, -14, 29, 1, -59, -108, -10, 104, 53, 84, -52, -107, -49, 115, -76, 46, -84, 29, -26, 60, -63, 6, 114, -55, -62, 104, -70, 64, 15, -44, 105, 104, -36, -38, -60, 21, 67, 79, -119, 27, 85, 32, 83, 101, -88, -68, 25, 77, -57, -93, -7, -124, 78, 123, 3, -50, -87, 96, 22, -53, -38, -24, -110, 21, -58, 54, -84, 62, -70, 82, -104, -6, -92, -73, 50, 5, 84, -15, 84, -44, -12, -31, -2, -112, 58, -82, -94, 77, -119, -17, -66, -2, 114, -68, 9, -25, -111, 71, -91, 75, -13, -22, -82, 4, 63, -55, 89, 83, 97, -8, -116, 7, 93, -4, 73, -31, -45, -83, -17, 66, 14, 93, -52, 28, 12, -118, -65, -28, 82, 8, -111, 113, -103, -90, 100, -97, -112, 18, 60, 68, -9, 23, -32, 112, -74, 109, -30, 18, -19, -1, -57, 49, -98, -76, -31, -15, 123, 73, 75, -103, 60, 82, 54, 67, -25, -37, -46, 40, -44, 88, -45, -96, -11, 10, -61, -83, -6, -91, -86, -10, -5, -3, -27, -59, -18, 31, -64, 76, 69, 7, 102, 7, -26, 1, 76, -47, -127, -39, -127, 121, -114, 96, -54, -77, 2, 83, 118, 96, 118, 96, 30, -64, 76, 127, 6, -13, 13, -51, 46, -90, -77, 98, 10, 0, 0} ; + private final String fourItemsNonGZIP = "{\"ver\":1,\"name\":\"Microsoft.ApplicationInsights.b69a3a06e25a425ba1a44e9ff6f13582.Event\",\"time\":\"2018-02-11T16:02:36.120-0500\",\"sampleRate\":100.0,\"iKey\":\"b69a3a06-e25a-425b-a1a4-4e9ff6f13582\",\"tags\":{\"ai.internal.sdkVersion\":\"java:2.0.0-beta-snapshot\",\"ai.device.id\":\"test.machine.name\",\"ai.device.locale\":\"en-US\",\"ai.internal.nodename\":\"test.machine.name\",\"ai.device.os\":\"Windows 10\",\"ai.device.roleInstance\":\"test.machine.name\",\"ai.device.osVersion\":\"Windows 10\",\"ai.session.id\":\"20180211160233\"},\"data\":{\"baseType\":\"EventData\",\"baseData\":{\"ver\":2,\"name\":\"TestEvent0\",\"properties\":null}}}\r\n" + + "{\"ver\":1,\"name\":\"Microsoft.ApplicationInsights.b69a3a06e25a425ba1a44e9ff6f13582.Event\",\"time\":\"2018-02-11T16:02:36.131-0500\",\"sampleRate\":100.0,\"iKey\":\"b69a3a06-e25a-425b-a1a4-4e9ff6f13582\",\"tags\":{\"ai.internal.sdkVersion\":\"java:2.0.0-beta-snapshot\",\"ai.device.id\":\"test.machine.name\",\"ai.device.locale\":\"en-US\",\"ai.internal.nodename\":\"test.machine.name\",\"ai.device.os\":\"Windows 10\",\"ai.device.roleInstance\":\"test.machine.name\",\"ai.device.osVersion\":\"Windows 10\",\"ai.session.id\":\"20180211160233\"},\"data\":{\"baseType\":\"EventData\",\"baseData\":{\"ver\":2,\"name\":\"TestEvent1\",\"properties\":null}}}\r\n" + + "{\"ver\":1,\"name\":\"Microsoft.ApplicationInsights.b69a3a06e25a425ba1a44e9ff6f13582.Event\",\"time\":\"2018-02-11T16:02:36.131-0500\",\"sampleRate\":100.0,\"iKey\":\"b69a3a06-e25a-425b-a1a4-4e9ff6f13582\",\"tags\":{\"ai.internal.sdkVersion\":\"java:2.0.0-beta-snapshot\",\"ai.device.id\":\"test.machine.name\",\"ai.device.locale\":\"en-US\",\"ai.internal.nodename\":\"test.machine.name\",\"ai.device.os\":\"Windows 10\",\"ai.device.roleInstance\":\"test.machine.name\",\"ai.device.osVersion\":\"Windows 10\",\"ai.session.id\":\"20180211160233\"},\"data\":{\"baseType\":\"EventData\",\"baseData\":{\"ver\":2,\"name\":\"TestEvent2\",\"properties\":null}}}\r\n" + + "{\"ver\":1,\"name\":\"Microsoft.ApplicationInsights.b69a3a06e25a425ba1a44e9ff6f13582.Event\",\"time\":\"2018-02-11T16:02:36.132-0500\",\"sampleRate\":100.0,\"iKey\":\"b69a3a06-e25a-425b-a1a4-4e9ff6f13582\",\"tags\":{\"ai.internal.sdkVersion\":\"java:2.0.0-beta-snapshot\",\"ai.device.id\":\"test.machine.name\",\"ai.device.locale\":\"en-US\",\"ai.internal.nodename\":\"test.machine.name\",\"ai.device.os\":\"Windows 10\",\"ai.device.roleInstance\":\"test.machine.name\",\"ai.device.osVersion\":\"Windows 10\",\"ai.session.id\":\"20180211160233\"},\"data\":{\"baseType\":\"EventData\",\"baseData\":{\"ver\":2,\"name\":\"TestEvent3\",\"properties\":null}}}"; + + private boolean generateTransmissionWithStatusCode(int code) { + TransmissionPolicyManager tpm = new TransmissionPolicyManager(true); + TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class); + TransmissionHandlerArgs args = new TransmissionHandlerArgs(); + args.setResponseCode(code); + args.setTransmission(new Transmission(new byte[] { 0 }, "testcontent", "testencoding")); + args.setTransmissionDispatcher(mockedDispatcher); + PartialSuccessHandler eh = new PartialSuccessHandler(tpm); + boolean result = eh.validateTransmissionAndSend(args); + return result; + } + + private boolean generateTransmissionWithPartialResult(String responseBody) { + TransmissionPolicyManager tpm = new TransmissionPolicyManager(true); + TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class); + TransmissionHandlerArgs args = new TransmissionHandlerArgs(); + args.setResponseCode(206); + args.setTransmission(new Transmission(fourItems, "application/x-json-stream", "gzip")); + args.setTransmissionDispatcher(mockedDispatcher); + args.setResponseBody(responseBody); + PartialSuccessHandler eh = new PartialSuccessHandler(tpm); + boolean result = eh.validateTransmissionAndSend(args); + return result; + } + + @Test + public void failOnNull() { + TransmissionPolicyManager tpm = new TransmissionPolicyManager(true); + TransmissionHandlerArgs args = new TransmissionHandlerArgs(); + ErrorHandler eh = new ErrorHandler(tpm); + boolean result = eh.validateTransmissionAndSend(args); + Assert.assertFalse(result); + } + + @Test + public void fail200Status() { + boolean result = generateTransmissionWithStatusCode(200); + Assert.assertFalse(result); + } + + @Test + public void fail400Status() { + boolean result = generateTransmissionWithStatusCode(400); + Assert.assertFalse(result); + } + + @Test + public void fail404Status() { + boolean result = generateTransmissionWithStatusCode(404); + Assert.assertFalse(result); + } + + @Test + public void fail408Status() { + boolean result = generateTransmissionWithStatusCode(408); + Assert.assertFalse(result); + } + + @Test + public void fail500Status() { + boolean result = generateTransmissionWithStatusCode(500); + Assert.assertFalse(result); + } + + @Test + public void fail503Status() { + boolean result = generateTransmissionWithStatusCode(503); + Assert.assertFalse(result); + } + + @Test + public void fail429Status() { + boolean result = generateTransmissionWithStatusCode(429); + Assert.assertFalse(result); + } + + @Test + public void fail439Status() { + boolean result = generateTransmissionWithStatusCode(439); + Assert.assertFalse(result); + } + + @Test + public void failEmptyArrayList() { + TransmissionPolicyManager tpm = new TransmissionPolicyManager(true); + TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class); + TransmissionHandlerArgs args = new TransmissionHandlerArgs(); + args.setResponseCode(206); + args.setTransmission(new Transmission(fourItems, "application/x-json-stream", "gzip")); + args.setTransmissionDispatcher(mockedDispatcher); + PartialSuccessHandler eh = new PartialSuccessHandler(tpm); + boolean result = eh.sendNewTransmission(args, new ArrayList()); + Assert.assertFalse(result); + } + + @Test + public void fail206StatusSkipAcceptedRecieved() { + String validResult = "{\r\n" + + " \"itemsReceived\": 4,\r\n" + + " \"itemsAccepted\": 4,\r\n" + + " \"errors\": []\r\n }"; + boolean result = generateTransmissionWithPartialResult(validResult); + Assert.assertFalse(result); + } + + @Test + public void fail206StatusSkipRecievedLargerThanSent() { + String validResult = "{\r\n" + + " \"itemsReceived\": 10,\r\n" + + " \"itemsAccepted\": 9,\r\n" + + " \"errors\": [{\r\n" + + " \"index\": 0,\r\n" + + " \"statusCode\": 400,\r\n" + + " \"message\": \"109: Field 'startTime' on type 'RequestData' is required but missing or empty. Expected: string, Actual: undefined\"\r\n" + + " }]\r\n }"; + boolean result = generateTransmissionWithPartialResult(validResult); + Assert.assertFalse(result); + } + + @Test + public void fail206IndexOutOfRange() { + String validResult = "{\r\n" + + " \"itemsReceived\": 4,\r\n" + + " \"itemsAccepted\": 1,\r\n" + + " \"errors\": [\r\n" + + " {\r\n" + + " \"index\": 20,\r\n" + + " \"statusCode\": 400,\r\n" + + " \"message\": \"109: Field 'startTime' on type 'RequestData' is required but missing or empty. Expected: string, Actual: undefined\"\r\n" + + " },\r\n" + + " {\r\n" + + " \"index\": 12,\r\n" + + " \"statusCode\": 500,\r\n" + + " \"message\": \"Internal Server Error\"\r\n" + + " },\r\n" + + " {\r\n" + + " \"index\": 22,\r\n" + + " \"statusCode\": 439,\r\n" + + " \"message\": \"Too many requests\"\r\n" + + " }\r\n" + + " ]\r\n" + + "}"; + boolean result = generateTransmissionWithPartialResult(validResult); + Assert.assertFalse(result); + } + + @Test + public void pass206MixedIndexOutOfRange() { + String validResult = "{\r\n" + + " \"itemsReceived\": 4,\r\n" + + " \"itemsAccepted\": 1,\r\n" + + " \"errors\": [\r\n" + + " {\r\n" + + " \"index\": 5,\r\n" + + " \"statusCode\": 400,\r\n" + + " \"message\": \"109: Field 'startTime' on type 'RequestData' is required but missing or empty. Expected: string, Actual: undefined\"\r\n" + + " },\r\n" + + " {\r\n" + + " \"index\": 1,\r\n" + + " \"statusCode\": 500,\r\n" + + " \"message\": \"Internal Server Error\"\r\n" + + " },\r\n" + + " {\r\n" + + " \"index\": 22,\r\n" + + " \"statusCode\": 439,\r\n" + + " \"message\": \"Too many requests\"\r\n" + + " }\r\n" + + " ]\r\n" + + "}"; + boolean result = generateTransmissionWithPartialResult(validResult); + Assert.assertTrue(result); + } + + @Test + public void pass206Status() { + String validResult = "{\r\n" + + " \"itemsReceived\": 4,\r\n" + + " \"itemsAccepted\": 1,\r\n" + + " \"errors\": [\r\n" + + " {\r\n" + + " \"index\": 0,\r\n" + + " \"statusCode\": 400,\r\n" + + " \"message\": \"109: Field 'startTime' on type 'RequestData' is required but missing or empty. Expected: string, Actual: undefined\"\r\n" + + " },\r\n" + + " {\r\n" + + " \"index\": 1,\r\n" + + " \"statusCode\": 500,\r\n" + + " \"message\": \"Internal Server Error\"\r\n" + + " },\r\n" + + " {\r\n" + + " \"index\": 2,\r\n" + + " \"statusCode\": 439,\r\n" + + " \"message\": \"Too many requests\"\r\n" + + " }\r\n" + + " ]\r\n" + + "}"; + boolean result = generateTransmissionWithPartialResult(validResult); + Assert.assertTrue(result); + } + + @Test + public void passSingleItemArrayList() { + TransmissionPolicyManager tpm = new TransmissionPolicyManager(true); + TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class); + TransmissionHandlerArgs args = new TransmissionHandlerArgs(); + args.setResponseCode(206); + args.setTransmission(new Transmission(fourItems, "application/x-json-stream", "gzip")); + args.setTransmissionDispatcher(mockedDispatcher); + PartialSuccessHandler eh = new PartialSuccessHandler(tpm); + List singleItem = new ArrayList(); + singleItem.add("{\"ver\":1,\"name\":\"Microsoft.ApplicationInsights.b69a3a06e25a425ba1a44e9ff6f13582.Event\",\"time\":\"2018-02-11T16:02:36.120-0500\",\"sampleRate\":100.0,\"iKey\":\"b69a3a06-e25a-425b-a1a4-4e9ff6f13582\",\"tags\":{\"ai.internal.sdkVersion\":\"java:2.0.0-beta-snapshot\",\"ai.device.id\":\"test.machine.name\",\"ai.device.locale\":\"en-US\",\"ai.internal.nodename\":\"test.machine.name\",\"ai.device.os\":\"Windows 10\",\"ai.device.roleInstance\":\"test.machine.name\",\"ai.device.osVersion\":\"Windows 10\",\"ai.session.id\":\"20180211160233\"},\"data\":{\"baseType\":\"EventData\",\"baseData\":{\"ver\":2,\"name\":\"TestEvent0\",\"properties\":null}}}\r\n"); + boolean result = eh.sendNewTransmission(args, singleItem); + Assert.assertTrue(result); + } + + @Test + public void passGenerateOriginalItemsGZIP() { + TransmissionPolicyManager tpm = new TransmissionPolicyManager(true); + TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class); + TransmissionHandlerArgs args = new TransmissionHandlerArgs(); + args.setResponseCode(206); + args.setTransmission(new Transmission(fourItems, "application/x-json-stream", "gzip")); + args.setTransmissionDispatcher(mockedDispatcher); + PartialSuccessHandler eh = new PartialSuccessHandler(tpm); + List originalItems = eh.generateOriginalItems(args); + Assert.assertEquals(4, originalItems.size()); + } + + @Test + public void passGenerateOriginalItemsNonGZIP() { + TransmissionPolicyManager tpm = new TransmissionPolicyManager(true); + TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class); + TransmissionHandlerArgs args = new TransmissionHandlerArgs(); + args.setResponseCode(206); + args.setTransmission(new Transmission(fourItemsNonGZIP.getBytes(), "application/json", "utf8")); + args.setTransmissionDispatcher(mockedDispatcher); + PartialSuccessHandler eh = new PartialSuccessHandler(tpm); + List originalItems = eh.generateOriginalItems(args); + Assert.assertEquals(4, originalItems.size()); + } + +} diff --git a/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/ThrottlingHandlerTest.java b/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/ThrottlingHandlerTest.java new file mode 100644 index 00000000000..05c1963ed26 --- /dev/null +++ b/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/ThrottlingHandlerTest.java @@ -0,0 +1,145 @@ +package com.microsoft.applicationinsights.internal.channel.common; + +import org.apache.http.message.BasicHeader; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import com.microsoft.applicationinsights.internal.channel.TransmissionDispatcher; +import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs; +import com.microsoft.applicationinsights.internal.channel.common.ErrorHandler; +import com.microsoft.applicationinsights.internal.channel.common.ThrottlingHandler; +import com.microsoft.applicationinsights.internal.channel.common.Transmission; +import com.microsoft.applicationinsights.internal.channel.common.TransmissionPolicyManager; + +public class ThrottlingHandlerTest { + + private final static String RESPONSE_THROTTLING_HEADER = "Retry-After"; + + private boolean generateTransmissionWithStatusCode(int code) { + TransmissionPolicyManager tpm = new TransmissionPolicyManager(true); + TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class); + TransmissionHandlerArgs args = new TransmissionHandlerArgs(); + args.setResponseCode(code); + args.setTransmission(new Transmission(new byte[] { 0 }, "testcontent", "testencoding")); + args.setTransmissionDispatcher(mockedDispatcher); + ThrottlingHandler eh = new ThrottlingHandler(tpm); + boolean result = eh.validateTransmissionAndSend(args); + return result; + } + + private boolean generateTransmissionWithStatusCodeAndHeader(int code, String retryHeader) { + TransmissionPolicyManager tpm = new TransmissionPolicyManager(true); + TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class); + TransmissionHandlerArgs args = new TransmissionHandlerArgs(); + args.setResponseCode(code); + args.setTransmission(new Transmission(new byte[] { 0 }, "testcontent", "testencoding")); + args.setTransmissionDispatcher(mockedDispatcher); + args.setRetryHeader(new BasicHeader(RESPONSE_THROTTLING_HEADER, retryHeader)); + ThrottlingHandler eh = new ThrottlingHandler(tpm); + boolean result = eh.validateTransmissionAndSend(args); + return result; + } + + @Test + public void failOnNull() { + TransmissionPolicyManager tpm = new TransmissionPolicyManager(true); + TransmissionHandlerArgs args = new TransmissionHandlerArgs(); + ErrorHandler eh = new ErrorHandler(tpm); + boolean result = eh.validateTransmissionAndSend(args); + Assert.assertFalse(result); + } + + @Test + public void fail200Status() { + boolean result = generateTransmissionWithStatusCode(200); + Assert.assertFalse(result); + } + + @Test + public void fail206Status() { + boolean result = generateTransmissionWithStatusCode(206); + Assert.assertFalse(result); + } + + @Test + public void fail400Status() { + boolean result = generateTransmissionWithStatusCode(400); + Assert.assertFalse(result); + } + + @Test + public void fail404Status() { + boolean result = generateTransmissionWithStatusCode(404); + Assert.assertFalse(result); + } + + @Test + public void fail408Status() { + boolean result = generateTransmissionWithStatusCode(408); + Assert.assertFalse(result); + } + + @Test + public void pass500Status() { + boolean result = generateTransmissionWithStatusCode(500); + Assert.assertFalse(result); + } + + @Test + public void fail503Status() { + boolean result = generateTransmissionWithStatusCode(503); + Assert.assertFalse(result); + } + + @Test + public void failException() { + TransmissionPolicyManager tpm = new TransmissionPolicyManager(true); + TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class); + TransmissionHandlerArgs args = new TransmissionHandlerArgs(); + args.setResponseCode(0); + args.setTransmission(null); + args.setTransmissionDispatcher(mockedDispatcher); + args.setException(new Exception("Mocked")); + ThrottlingHandler eh = new ThrottlingHandler(tpm); + boolean result = eh.validateTransmissionAndSend(args); + Assert.assertFalse(result); + } + + @Test + public void fail429StatusNoRetryHeader() { + boolean result = generateTransmissionWithStatusCode(429); + Assert.assertFalse(result); + } + + @Test + public void fail439StatusNoRetryHeader() { + boolean result = generateTransmissionWithStatusCode(439); + Assert.assertFalse(result); + } + + @Test + public void pass429StatusBadValue() { + boolean result = generateTransmissionWithStatusCodeAndHeader(429, "3600"); + Assert.assertTrue(result); + } + + @Test + public void pass429StatusGoodValue() { + boolean result = generateTransmissionWithStatusCodeAndHeader(429, "Sun, 11 Feb 2018 16:51:18 GMT"); + Assert.assertTrue(result); + } + + @Test + public void pass439StatusBadValue() { + boolean result = generateTransmissionWithStatusCodeAndHeader(439, "3600"); + Assert.assertTrue(result); + } + + @Test + public void pass439StatusGoodValue() { + boolean result = generateTransmissionWithStatusCodeAndHeader(439, "Sun, 11 Feb 2018 16:51:18 GMT"); + Assert.assertTrue(result); + } + +} diff --git a/core/src/test/java/com/microsoft/applicationinsights/internal/channel/inprocess/InProcessTelemetryChannelTest.java b/core/src/test/java/com/microsoft/applicationinsights/internal/channel/inprocess/InProcessTelemetryChannelTest.java index 0619790d0ed..eed12e3e5e2 100644 --- a/core/src/test/java/com/microsoft/applicationinsights/internal/channel/inprocess/InProcessTelemetryChannelTest.java +++ b/core/src/test/java/com/microsoft/applicationinsights/internal/channel/inprocess/InProcessTelemetryChannelTest.java @@ -29,6 +29,8 @@ public class InProcessTelemetryChannelTest { private final static String NON_VALID_URL = "http:sd{@~fsd.s.d.f;fffff"; + private final static String INSTANT_RETRY_NAME = "MaxInstantRetry"; + private final static int DEFAULT_MAX_INSTANT_RETRY = 3; @Test(expected = IllegalArgumentException.class) public void testNotValidEndpointAddressAsMapValue() { @@ -36,4 +38,24 @@ public void testNotValidEndpointAddressAsMapValue() { map.put("EndpointAddress", NON_VALID_URL); new InProcessTelemetryChannel(map); } + @Test() + public void testStringIntegerMaxInstanceRetry() { + HashMap map = new HashMap(); + map.put(INSTANT_RETRY_NAME, "AABB"); + new InProcessTelemetryChannel(map); + } + + @Test() + public void testValidIntegerMaxInstanceRetry() { + HashMap map = new HashMap(); + map.put(INSTANT_RETRY_NAME, "4"); + new InProcessTelemetryChannel(map); + } + + @Test() + public void testInvalidIntegerMaxInstanceRetry() { + HashMap map = new HashMap(); + map.put(INSTANT_RETRY_NAME, "-1"); + new InProcessTelemetryChannel(map); + } } \ No newline at end of file diff --git a/test/performance/src/main/java/com/microsoft/applicationinsights/core/volume/ThroughputTestTransmitterFactory.java b/test/performance/src/main/java/com/microsoft/applicationinsights/core/volume/ThroughputTestTransmitterFactory.java index be62d49b0a7..19aa8f1a6f1 100644 --- a/test/performance/src/main/java/com/microsoft/applicationinsights/core/volume/ThroughputTestTransmitterFactory.java +++ b/test/performance/src/main/java/com/microsoft/applicationinsights/core/volume/ThroughputTestTransmitterFactory.java @@ -34,11 +34,22 @@ final class ThroughputTestTransmitterFactory implements TransmitterFactory { @Override public TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled) { + return create(endpoint, maxTransmissionStorageCapacity, throttlingIsEnabled, 3); + } + + @Override + public TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled, int maxInstanceRetries) { // An active object with the network sender TransmissionOutput actualNetworkSender = TestThreadLocalData.getTransmissionOutput(); final TransmissionPolicyManager transmissionPolicyManager = new TransmissionPolicyManager(throttlingIsEnabled); + transmissionPolicyManager.addTransmissionHandler(new ErrorHandler(transmissionPolicyManager)); + transmissionPolicyManager.addTransmissionHandler(new PartialSuccessHandler(transmissionPolicyManager)); + transmissionPolicyManager.addTransmissionHandler(new ThrottlingHandler(transmissionPolicyManager)); + transmissionPolicyManager.setMaxInstantRetries(maxInstanceRetries); + TransmissionPolicyStateFetcher stateFetcher = transmissionPolicyManager.getTransmissionPolicyState(); TransmissionOutput networkSender = new ActiveTransmissionNetworkOutput(actualNetworkSender, stateFetcher); + // An active object with the file system sender TransmissionFileSystemOutput fileSystemSender = new TransmissionFileSystemOutput();