diff --git a/core/build.gradle b/core/build.gradle
index da579048d7e..c9bce568a34 100644
--- a/core/build.gradle
+++ b/core/build.gradle
@@ -44,6 +44,7 @@ dependencies {
compile ([group: 'org.apache.commons', name: 'commons-lang3', version: '3.7'])
compile ([group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.3'])
compile ([group: 'com.google.guava', name: 'guava', version: '20.0'])
+ compile ([group: 'com.google.code.gson', name: 'gson', version: '2.8.2'])
testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile group: 'org.mockito', name: 'mockito-all', version: '1.10.19'
testCompile group: 'com.google.code.gson', name: 'gson', version: '2.8.2'
@@ -56,6 +57,7 @@ shadowJar {
relocate 'org.apache.commons', 'com.microsoft.applicationinsights.core.dependencies.apachecommons'
relocate 'com.google.common', 'com.microsoft.applicationinsights.core.dependencies.googlecommon'
relocate 'javax.annotation', 'com.microsoft.applicationinsights.core.dependencies.javaxannotation'
+ relocate 'com.google.gson', 'com.microsoft.applicationinsights.core.dependencies.gson'
}
jar {
diff --git a/core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryChannel.java b/core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryChannel.java
index 05bac6d1dbb..a01766ba919 100644
--- a/core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryChannel.java
+++ b/core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryChannel.java
@@ -22,7 +22,6 @@
package com.microsoft.applicationinsights.channel.concrete.inprocess;
import java.io.IOException;
-import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URI;
import java.util.Map;
@@ -38,7 +37,6 @@
import com.microsoft.applicationinsights.internal.util.LocalStringsUtils;
import com.microsoft.applicationinsights.internal.util.Sanitizer;
import com.microsoft.applicationinsights.telemetry.JsonTelemetryDataSerializer;
-import com.microsoft.applicationinsights.telemetry.SupportSampling;
import com.microsoft.applicationinsights.telemetry.Telemetry;
import com.microsoft.applicationinsights.channel.TelemetryChannel;
@@ -47,313 +45,343 @@
import org.apache.commons.lang3.exception.ExceptionUtils;
/**
- * An implementation of {@link com.microsoft.applicationinsights.channel.TelemetryChannel}
+ * An implementation of
+ * {@link com.microsoft.applicationinsights.channel.TelemetryChannel}
*
* The channel holds two main entities:
*
- * A buffer for incoming {@link com.microsoft.applicationinsights.telemetry.Telemetry} instances
- * A transmitter
+ * A buffer for incoming
+ * {@link com.microsoft.applicationinsights.telemetry.Telemetry} instances A
+ * transmitter
*
- * The buffer is stores incoming telemetry instances. Every new buffer starts a timer.
- * When the timer expires, or when the buffer is 'full' (whichever happens first), the
- * transmitter will pick up that buffer and will handle its sending to the server. For example,
- * a transmitter will be responsible for compressing, sending and activate a policy in case of failures.
+ * The buffer is stores incoming telemetry instances. Every new buffer starts a
+ * timer. When the timer expires, or when the buffer is 'full' (whichever
+ * happens first), the transmitter will pick up that buffer and will handle its
+ * sending to the server. For example, a transmitter will be responsible for
+ * compressing, sending and activate a policy in case of failures.
*
* The model here is:
*
- * Use application threads to populate the buffer
- * Use channel's threads to send buffers to the server
+ * Use application threads to populate the buffer Use channel's threads to send
+ * buffers to the server
*
* Created by gupele on 12/17/2014.
*/
public final class InProcessTelemetryChannel implements TelemetryChannel {
- private final static int DEFAULT_MAX_TELEMETRY_BUFFER_CAPACITY = 500;
- private final static int MIN_MAX_TELEMETRY_BUFFER_CAPACITY = 1;
- private final static int MAX_MAX_TELEMETRY_BUFFER_CAPACITY = 1000;
- private final static String MAX_MAX_TELEMETRY_BUFFER_CAPACITY_NAME = "MaxTelemetryBufferCapacity";
-
- private final static int DEFAULT_FLUSH_BUFFER_TIMEOUT_IN_SECONDS = 5;
- private final static int MIN_FLUSH_BUFFER_TIMEOUT_IN_SECONDS = 1;
- private final static int MAX_FLUSH_BUFFER_TIMEOUT_IN_SECONDS = 300;
- private final static String FLUSH_BUFFER_TIMEOUT_IN_SECONDS_NAME = "FlushIntervalInSeconds";
-
- private final static String DEVELOPER_MODE_SYSTEM_PROPRETY_NAME = "APPLICATION_INSIGHTS_DEVELOPER_MODE";
-
- private final static String DEVELOPER_MODE_NAME = "DeveloperMode";
- private final static String ENDPOINT_ADDRESS_NAME = "EndpointAddress";
- private final static String MAX_TRANSMISSION_STORAGE_CAPACITY_NAME = "MaxTransmissionStorageFilesCapacityInMB";
-
- private boolean developerMode = false;
- private static TransmitterFactory s_transmitterFactory;
-
- private boolean stopped = false;
-
- private TelemetriesTransmitter telemetriesTransmitter;
-
- private TelemetryBuffer telemetryBuffer;
- private TelemetrySampler telemetrySampler;
-
- private static AtomicLong itemsSent = new AtomicLong(0);
-
- public InProcessTelemetryChannel() {
- boolean developerMode = false;
- try {
- String developerModeAsString = System.getProperty(DEVELOPER_MODE_SYSTEM_PROPRETY_NAME);
- if (!LocalStringsUtils.isNullOrEmpty(developerModeAsString)) {
- developerMode = Boolean.valueOf(developerModeAsString);
- }
- } catch (Throwable t) {
- developerMode = false;
- InternalLogger.INSTANCE.trace("%s generated exception in parsing," +
- "stack trace is %s", DEVELOPER_MODE_SYSTEM_PROPRETY_NAME, ExceptionUtils.getStackTrace(t));
- }
- initialize(null,
- null,
- developerMode,
- createDefaultMaxTelemetryBufferCapacityEnforcer(null),
- createDefaultSendIntervalInSecondsEnforcer(null),
- true);
- }
-
- /**
- * Ctor
- *
- * @param endpointAddress Must be empty string or a valid uri, else an exception will be thrown
- * @param developerMode True will behave in a 'non-production' mode to ease the debugging
- * @param maxTelemetryBufferCapacity Max number of Telemetries we keep in the buffer, when reached we will send the buffer
- * Note, value should be between TRANSMIT_BUFFER_MIN_TIMEOUT_IN_MILLIS and TRANSMIT_BUFFER_MAX_TIMEOUT_IN_MILLIS inclusive
- * @param sendIntervalInMillis The maximum number of milliseconds to wait before we send the buffer
- * Note, value should be between MIN_MAX_TELEMETRY_BUFFER_CAPACITY and MAX_MAX_TELEMETRY_BUFFER_CAPACITY inclusive
- */
- public InProcessTelemetryChannel(String endpointAddress, boolean developerMode, int maxTelemetryBufferCapacity, int sendIntervalInMillis) {
- initialize(endpointAddress,
- null,
- developerMode,
- createDefaultMaxTelemetryBufferCapacityEnforcer(maxTelemetryBufferCapacity),
- createDefaultSendIntervalInSecondsEnforcer(sendIntervalInMillis),
- true);
- }
-
- /**
- * This Ctor will query the 'namesAndValues' map for data to initialize itself
- * It will ignore data that is not of its interest, this Ctor is useful for building an instance from configuration
- *
- * @param namesAndValues - The data passed as name and value pairs
- */
- public InProcessTelemetryChannel(Map namesAndValues) {
- boolean developerMode = false;
- String endpointAddress = null;
-
- LimitsEnforcer maxTelemetryBufferCapacityEnforcer = createDefaultMaxTelemetryBufferCapacityEnforcer(null);
-
- LimitsEnforcer sendIntervalInSecondsEnforcer = createDefaultSendIntervalInSecondsEnforcer(null);
-
- boolean throttling = true;
- if (namesAndValues != null) {
- throttling = Boolean.valueOf(namesAndValues.get("Throttling"));
- developerMode = Boolean.valueOf(namesAndValues.get(DEVELOPER_MODE_NAME));
- if (!developerMode) {
- developerMode = Boolean.valueOf(System.getProperty(DEVELOPER_MODE_SYSTEM_PROPRETY_NAME));
- }
- endpointAddress = namesAndValues.get(ENDPOINT_ADDRESS_NAME);
-
- maxTelemetryBufferCapacityEnforcer.normalizeStringValue(namesAndValues.get(MAX_MAX_TELEMETRY_BUFFER_CAPACITY_NAME));
- sendIntervalInSecondsEnforcer.normalizeStringValue(namesAndValues.get(FLUSH_BUFFER_TIMEOUT_IN_SECONDS_NAME));
- }
-
- String maxTransmissionStorageCapacity = namesAndValues.get(MAX_TRANSMISSION_STORAGE_CAPACITY_NAME);
- initialize(endpointAddress, maxTransmissionStorageCapacity, developerMode, maxTelemetryBufferCapacityEnforcer, sendIntervalInSecondsEnforcer, throttling);
- }
-
- /**
- * Gets value indicating whether this channel is in developer mode.
- */
- @Override
- public boolean isDeveloperMode() {
- return developerMode;
- }
-
- /**
- * Sets value indicating whether this channel is in developer mode.
- *
- * @param developerMode True or false
- */
- @Override
- public void setDeveloperMode(boolean developerMode) {
- if (developerMode != this.developerMode) {
- this.developerMode = developerMode;
- int maxTelemetriesInBatch = this.developerMode ? 1 : DEFAULT_MAX_TELEMETRY_BUFFER_CAPACITY;
-
- setMaxTelemetriesInBatch(maxTelemetriesInBatch);
- }
- }
-
- /**
- * Sends a Telemetry instance through the channel.
- */
- @Override
- public void send(Telemetry telemetry) {
- Preconditions.checkNotNull(telemetry, "Telemetry item must be non null");
-
- if (isDeveloperMode()) {
- telemetry.getContext().getProperties().put("DeveloperMode", "true");
- }
-
- if (telemetrySampler != null) {
- if (!telemetrySampler.isSampledIn(telemetry)) {
- return;
- }
- }
-
- StringWriter writer = new StringWriter();
- JsonTelemetryDataSerializer jsonWriter = null;
- try {
- jsonWriter = new JsonTelemetryDataSerializer(writer);
- telemetry.serialize(jsonWriter);
- jsonWriter.close();
- String asJson = writer.toString();
- telemetryBuffer.add(asJson);
- telemetry.reset();
- if (itemsSent.incrementAndGet() % 10000 == 0) {
- InternalLogger.INSTANCE.info("items sent till now %d", itemsSent.get());
- }
-
- } catch (IOException e) {
- InternalLogger.INSTANCE.error("Failed to serialize Telemetry");
- InternalLogger.INSTANCE.trace("Stack trace is %s", ExceptionUtils.getStackTrace(e));
- return;
- }
-
- if (isDeveloperMode()) {
- writeTelemetryToDebugOutput(telemetry);
- }
- }
-
- /**
- * Stops on going work
- */
- @Override
- public synchronized void stop(long timeout, TimeUnit timeUnit) {
- try {
- if (stopped) {
- return;
- }
-
- telemetriesTransmitter.stop(timeout, timeUnit);
- stopped = true;
- } catch (Throwable t) {
- InternalLogger.INSTANCE.error("Exception generated while stopping telemetry transmitter");
- InternalLogger.INSTANCE.trace("Stack trace generated is %s", ExceptionUtils.getStackTrace(t));
- }
- }
-
- /**
- * Flushes the data that the channel might have internally.
- */
- @Override
- public void flush() {
- telemetryBuffer.flush();
- }
-
- /**
- * Sets an optional Sampler that can sample out telemetries
- * Currently, we don't allow to replace a valid telemtry sampler.
- *
- * @param telemetrySampler - The sampler
- */
- @Override
- public void setSampler(TelemetrySampler telemetrySampler) {
- if (this.telemetrySampler == null) {
- this.telemetrySampler = telemetrySampler;
- }
- }
-
- /**
- * Sets the buffer size
- *
- * @param maxTelemetriesInBatch should be between MIN_MAX_TELEMETRY_BUFFER_CAPACITY
- * and MAX_MAX_TELEMETRY_BUFFER_CAPACITY inclusive
- * if the number is lower than the minimum then the minimum will be used
- * if the number is higher than the maximum then the maximum will be used
- */
- public void setMaxTelemetriesInBatch(int maxTelemetriesInBatch) {
- telemetryBuffer.setMaxTelemetriesInBatch(maxTelemetriesInBatch);
- }
-
- /**
- * Sets the time tow wait before flushing the internal buffer
- *
- * @param transmitBufferTimeoutInSeconds should be between MIN_FLUSH_BUFFER_TIMEOUT_IN_SECONDS
- * and MAX_FLUSH_BUFFER_TIMEOUT_IN_SECONDS inclusive
- * if the number is lower than the minimum then the minimum will be used
- * if the number is higher than the maximum then the maximum will be used
- */
- public void setTransmitBufferTimeoutInSeconds(int transmitBufferTimeoutInSeconds) {
- telemetryBuffer.setTransmitBufferTimeoutInSeconds(transmitBufferTimeoutInSeconds);
- }
-
- private void writeTelemetryToDebugOutput(Telemetry telemetry) {
- InternalLogger.INSTANCE.trace("InProcessTelemetryChannel sending telemetry");
- }
-
- private synchronized void initialize(String endpointAddress,
- String maxTransmissionStorageCapacity,
- boolean developerMode,
- LimitsEnforcer maxTelemetryBufferCapacityEnforcer,
- LimitsEnforcer sendIntervalInSeconds,
- boolean throttling) {
- makeSureEndpointAddressIsValid(endpointAddress);
-
- if (s_transmitterFactory == null) {
- s_transmitterFactory = new InProcessTelemetryChannelFactory();
- }
-
- telemetriesTransmitter = s_transmitterFactory.create(endpointAddress, maxTransmissionStorageCapacity, throttling);
- telemetryBuffer = new TelemetryBuffer(telemetriesTransmitter, maxTelemetryBufferCapacityEnforcer, sendIntervalInSeconds);
-
- setDeveloperMode(developerMode);
- }
-
- /**
- * The method will throw IllegalArgumentException if the endpointAddress is not a valid uri
- * Please note that a null or empty string is valid as far as the class is concerned and thus considered valid
- *
- * @param endpointAddress
- */
- private void makeSureEndpointAddressIsValid(String endpointAddress) {
- if (Strings.isNullOrEmpty(endpointAddress)) {
- return;
- }
-
- URI uri = Sanitizer.sanitizeUri(endpointAddress);
- if (uri == null) {
- String errorMessage = String.format("Endpoint address %s is not a valid uri", endpointAddress);
- InternalLogger.INSTANCE.error(errorMessage);
- throw new IllegalArgumentException(errorMessage);
- }
- }
-
- private LimitsEnforcer createDefaultMaxTelemetryBufferCapacityEnforcer(Integer currentValue) {
- LimitsEnforcer maxItemsInBatchEnforcer =
- LimitsEnforcer.createWithClosestLimitOnError(
- MAX_MAX_TELEMETRY_BUFFER_CAPACITY_NAME,
- MIN_MAX_TELEMETRY_BUFFER_CAPACITY,
- MAX_MAX_TELEMETRY_BUFFER_CAPACITY,
- DEFAULT_MAX_TELEMETRY_BUFFER_CAPACITY,
- currentValue == null ? DEFAULT_MAX_TELEMETRY_BUFFER_CAPACITY : currentValue);
-
- return maxItemsInBatchEnforcer;
- }
-
- private LimitsEnforcer createDefaultSendIntervalInSecondsEnforcer(Integer currentValue) {
- LimitsEnforcer sendIntervalInSecondsEnforcer =
- LimitsEnforcer.createWithClosestLimitOnError(
- FLUSH_BUFFER_TIMEOUT_IN_SECONDS_NAME,
- MIN_FLUSH_BUFFER_TIMEOUT_IN_SECONDS,
- MAX_FLUSH_BUFFER_TIMEOUT_IN_SECONDS,
- DEFAULT_FLUSH_BUFFER_TIMEOUT_IN_SECONDS,
- currentValue == null ? DEFAULT_FLUSH_BUFFER_TIMEOUT_IN_SECONDS : currentValue);
-
- return sendIntervalInSecondsEnforcer;
- }
+
+ private final static String INSTANT_RETRY_NAME = "MaxInstantRetry";
+ private final static int DEFAULT_MAX_INSTANT_RETRY = 3;
+ private final static int DEFAULT_MAX_TELEMETRY_BUFFER_CAPACITY = 500;
+ private final static int MIN_MAX_TELEMETRY_BUFFER_CAPACITY = 1;
+ private final static int MAX_MAX_TELEMETRY_BUFFER_CAPACITY = 1000;
+ private final static String MAX_MAX_TELEMETRY_BUFFER_CAPACITY_NAME = "MaxTelemetryBufferCapacity";
+
+ private final static int DEFAULT_FLUSH_BUFFER_TIMEOUT_IN_SECONDS = 5;
+ private final static int MIN_FLUSH_BUFFER_TIMEOUT_IN_SECONDS = 1;
+ private final static int MAX_FLUSH_BUFFER_TIMEOUT_IN_SECONDS = 300;
+ private final static String FLUSH_BUFFER_TIMEOUT_IN_SECONDS_NAME = "FlushIntervalInSeconds";
+
+ private final static String DEVELOPER_MODE_SYSTEM_PROPRETY_NAME = "APPLICATION_INSIGHTS_DEVELOPER_MODE";
+
+ private final static String DEVELOPER_MODE_NAME = "DeveloperMode";
+ private final static String ENDPOINT_ADDRESS_NAME = "EndpointAddress";
+ private final static String MAX_TRANSMISSION_STORAGE_CAPACITY_NAME = "MaxTransmissionStorageFilesCapacityInMB";
+
+ private boolean developerMode = false;
+ private static TransmitterFactory s_transmitterFactory;
+
+ private boolean stopped = false;
+
+ private TelemetriesTransmitter telemetriesTransmitter;
+
+ private TelemetryBuffer telemetryBuffer;
+ private TelemetrySampler telemetrySampler;
+
+ private static AtomicLong itemsSent = new AtomicLong(0);
+
+ public InProcessTelemetryChannel() {
+ boolean developerMode = false;
+ try {
+ String developerModeAsString = System.getProperty(DEVELOPER_MODE_SYSTEM_PROPRETY_NAME);
+ if (!LocalStringsUtils.isNullOrEmpty(developerModeAsString)) {
+ developerMode = Boolean.valueOf(developerModeAsString);
+ }
+ } catch (Throwable t) {
+ developerMode = false;
+ InternalLogger.INSTANCE.trace("%s generated exception in parsing," + "stack trace is %s",
+ DEVELOPER_MODE_SYSTEM_PROPRETY_NAME, ExceptionUtils.getStackTrace(t));
+ }
+ initialize(null, null, developerMode, createDefaultMaxTelemetryBufferCapacityEnforcer(null),
+ createDefaultSendIntervalInSecondsEnforcer(null), true);
+ }
+
+ /**
+ * Ctor
+ *
+ * @param endpointAddress
+ * Must be empty string or a valid uri, else an exception will be
+ * thrown
+ * @param developerMode
+ * True will behave in a 'non-production' mode to ease the debugging
+ * @param maxTelemetryBufferCapacity
+ * Max number of Telemetries we keep in the buffer, when reached we
+ * will send the buffer Note, value should be between
+ * TRANSMIT_BUFFER_MIN_TIMEOUT_IN_MILLIS and
+ * TRANSMIT_BUFFER_MAX_TIMEOUT_IN_MILLIS inclusive
+ * @param sendIntervalInMillis
+ * The maximum number of milliseconds to wait before we send the
+ * buffer Note, value should be between
+ * MIN_MAX_TELEMETRY_BUFFER_CAPACITY and
+ * MAX_MAX_TELEMETRY_BUFFER_CAPACITY inclusive
+ */
+ public InProcessTelemetryChannel(String endpointAddress, boolean developerMode, int maxTelemetryBufferCapacity,
+ int sendIntervalInMillis) {
+ initialize(endpointAddress, null, developerMode,
+ createDefaultMaxTelemetryBufferCapacityEnforcer(maxTelemetryBufferCapacity),
+ createDefaultSendIntervalInSecondsEnforcer(sendIntervalInMillis), true);
+ }
+
+ /**
+ * This Ctor will query the 'namesAndValues' map for data to initialize itself
+ * It will ignore data that is not of its interest, this Ctor is useful for
+ * building an instance from configuration
+ *
+ * @param namesAndValues
+ * - The data passed as name and value pairs
+ */
+ public InProcessTelemetryChannel(Map namesAndValues) {
+ boolean developerMode = false;
+ String endpointAddress = null;
+ int maxInstantRetries = DEFAULT_MAX_INSTANT_RETRY;
+
+ LimitsEnforcer maxTelemetryBufferCapacityEnforcer = createDefaultMaxTelemetryBufferCapacityEnforcer(null);
+
+ LimitsEnforcer sendIntervalInSecondsEnforcer = createDefaultSendIntervalInSecondsEnforcer(null);
+
+ boolean throttling = true;
+ if (namesAndValues != null) {
+ throttling = Boolean.valueOf(namesAndValues.get("Throttling"));
+ developerMode = Boolean.valueOf(namesAndValues.get(DEVELOPER_MODE_NAME));
+ try {
+ String instantRetryValue = namesAndValues.get(INSTANT_RETRY_NAME);
+ if (instantRetryValue != null){
+ maxInstantRetries = Integer.parseInt(instantRetryValue);
+ }
+
+ } catch (NumberFormatException e) {
+ InternalLogger.INSTANCE.error("Unable to parse configuration setting %s to integer value.%nStack Trace:%n%s", INSTANT_RETRY_NAME, ExceptionUtils.getStackTrace(e));
+ }
+
+ if (!developerMode) {
+ developerMode = Boolean.valueOf(System.getProperty(DEVELOPER_MODE_SYSTEM_PROPRETY_NAME));
+ }
+ endpointAddress = namesAndValues.get(ENDPOINT_ADDRESS_NAME);
+
+ maxTelemetryBufferCapacityEnforcer
+ .normalizeStringValue(namesAndValues.get(MAX_MAX_TELEMETRY_BUFFER_CAPACITY_NAME));
+ sendIntervalInSecondsEnforcer
+ .normalizeStringValue(namesAndValues.get(FLUSH_BUFFER_TIMEOUT_IN_SECONDS_NAME));
+ }
+
+ String maxTransmissionStorageCapacity = namesAndValues.get(MAX_TRANSMISSION_STORAGE_CAPACITY_NAME);
+ initialize(endpointAddress, maxTransmissionStorageCapacity, developerMode, maxTelemetryBufferCapacityEnforcer,
+ sendIntervalInSecondsEnforcer, throttling, maxInstantRetries);
+ }
+
+ /**
+ * Gets value indicating whether this channel is in developer mode.
+ */
+ @Override
+ public boolean isDeveloperMode() {
+ return developerMode;
+ }
+
+ /**
+ * Sets value indicating whether this channel is in developer mode.
+ *
+ * @param developerMode
+ * True or false
+ */
+ @Override
+ public void setDeveloperMode(boolean developerMode) {
+ if (developerMode != this.developerMode) {
+ this.developerMode = developerMode;
+ int maxTelemetriesInBatch = this.developerMode ? 1 : DEFAULT_MAX_TELEMETRY_BUFFER_CAPACITY;
+
+ setMaxTelemetriesInBatch(maxTelemetriesInBatch);
+ }
+ }
+
+ /**
+ * Sends a Telemetry instance through the channel.
+ */
+ @Override
+ public void send(Telemetry telemetry) {
+ Preconditions.checkNotNull(telemetry, "Telemetry item must be non null");
+
+ if (isDeveloperMode()) {
+ telemetry.getContext().getProperties().put("DeveloperMode", "true");
+ }
+
+ if (telemetrySampler != null) {
+ if (!telemetrySampler.isSampledIn(telemetry)) {
+ return;
+ }
+ }
+
+ StringWriter writer = new StringWriter();
+ JsonTelemetryDataSerializer jsonWriter = null;
+ try {
+ jsonWriter = new JsonTelemetryDataSerializer(writer);
+ telemetry.serialize(jsonWriter);
+ jsonWriter.close();
+ String asJson = writer.toString();
+ telemetryBuffer.add(asJson);
+ telemetry.reset();
+ if (itemsSent.incrementAndGet() % 10000 == 0) {
+ InternalLogger.INSTANCE.info("items sent till now %d", itemsSent.get());
+ }
+
+ } catch (IOException e) {
+ InternalLogger.INSTANCE.error("Failed to serialize Telemetry");
+ InternalLogger.INSTANCE.trace("Stack trace is %s", ExceptionUtils.getStackTrace(e));
+ return;
+ }
+
+ if (isDeveloperMode()) {
+ writeTelemetryToDebugOutput(telemetry);
+ }
+ }
+
+ /**
+ * Stops on going work
+ */
+ @Override
+ public synchronized void stop(long timeout, TimeUnit timeUnit) {
+ try {
+ if (stopped) {
+ return;
+ }
+
+ telemetriesTransmitter.stop(timeout, timeUnit);
+ stopped = true;
+ } catch (Throwable t) {
+ InternalLogger.INSTANCE.error("Exception generated while stopping telemetry transmitter");
+ InternalLogger.INSTANCE.trace("Stack trace generated is %s", ExceptionUtils.getStackTrace(t));
+ }
+ }
+
+ /**
+ * Flushes the data that the channel might have internally.
+ */
+ @Override
+ public void flush() {
+ telemetryBuffer.flush();
+ }
+
+ /**
+ * Sets an optional Sampler that can sample out telemetries Currently, we don't
+ * allow to replace a valid telemtry sampler.
+ *
+ * @param telemetrySampler
+ * - The sampler
+ */
+ @Override
+ public void setSampler(TelemetrySampler telemetrySampler) {
+ if (this.telemetrySampler == null) {
+ this.telemetrySampler = telemetrySampler;
+ }
+ }
+
+ /**
+ * Sets the buffer size
+ *
+ * @param maxTelemetriesInBatch
+ * should be between MIN_MAX_TELEMETRY_BUFFER_CAPACITY and
+ * MAX_MAX_TELEMETRY_BUFFER_CAPACITY inclusive if the number is lower
+ * than the minimum then the minimum will be used if the number is
+ * higher than the maximum then the maximum will be used
+ */
+ public void setMaxTelemetriesInBatch(int maxTelemetriesInBatch) {
+ telemetryBuffer.setMaxTelemetriesInBatch(maxTelemetriesInBatch);
+ }
+
+ /**
+ * Sets the time tow wait before flushing the internal buffer
+ *
+ * @param transmitBufferTimeoutInSeconds
+ * should be between MIN_FLUSH_BUFFER_TIMEOUT_IN_SECONDS and
+ * MAX_FLUSH_BUFFER_TIMEOUT_IN_SECONDS inclusive if the number is
+ * lower than the minimum then the minimum will be used if the number
+ * is higher than the maximum then the maximum will be used
+ */
+ public void setTransmitBufferTimeoutInSeconds(int transmitBufferTimeoutInSeconds) {
+ telemetryBuffer.setTransmitBufferTimeoutInSeconds(transmitBufferTimeoutInSeconds);
+ }
+
+ private void writeTelemetryToDebugOutput(Telemetry telemetry) {
+ InternalLogger.INSTANCE.trace("InProcessTelemetryChannel sending telemetry");
+ }
+
+ private synchronized void initialize(String endpointAddress, String maxTransmissionStorageCapacity,
+ boolean developerMode, LimitsEnforcer maxTelemetryBufferCapacityEnforcer,
+ LimitsEnforcer sendIntervalInSeconds, boolean throttling) {
+ initialize(endpointAddress, maxTransmissionStorageCapacity, developerMode, maxTelemetryBufferCapacityEnforcer,
+ sendIntervalInSeconds, throttling, DEFAULT_MAX_INSTANT_RETRY);
+ }
+
+ private synchronized void initialize(String endpointAddress, String maxTransmissionStorageCapacity,
+ boolean developerMode, LimitsEnforcer maxTelemetryBufferCapacityEnforcer,
+ LimitsEnforcer sendIntervalInSeconds, boolean throttling, int maxInstantRetry) {
+ makeSureEndpointAddressIsValid(endpointAddress);
+
+ if (s_transmitterFactory == null) {
+ s_transmitterFactory = new InProcessTelemetryChannelFactory();
+ }
+
+ telemetriesTransmitter = s_transmitterFactory.create(endpointAddress, maxTransmissionStorageCapacity,
+ throttling, maxInstantRetry);
+ telemetryBuffer = new TelemetryBuffer(telemetriesTransmitter, maxTelemetryBufferCapacityEnforcer,
+ sendIntervalInSeconds);
+
+ setDeveloperMode(developerMode);
+ }
+
+ /**
+ * The method will throw IllegalArgumentException if the endpointAddress is not
+ * a valid uri Please note that a null or empty string is valid as far as the
+ * class is concerned and thus considered valid
+ *
+ * @param endpointAddress
+ */
+ private void makeSureEndpointAddressIsValid(String endpointAddress) {
+ if (Strings.isNullOrEmpty(endpointAddress)) {
+ return;
+ }
+
+ URI uri = Sanitizer.sanitizeUri(endpointAddress);
+ if (uri == null) {
+ String errorMessage = String.format("Endpoint address %s is not a valid uri", endpointAddress);
+ InternalLogger.INSTANCE.error(errorMessage);
+ throw new IllegalArgumentException(errorMessage);
+ }
+ }
+
+ private LimitsEnforcer createDefaultMaxTelemetryBufferCapacityEnforcer(Integer currentValue) {
+ LimitsEnforcer maxItemsInBatchEnforcer = LimitsEnforcer.createWithClosestLimitOnError(
+ MAX_MAX_TELEMETRY_BUFFER_CAPACITY_NAME, MIN_MAX_TELEMETRY_BUFFER_CAPACITY,
+ MAX_MAX_TELEMETRY_BUFFER_CAPACITY, DEFAULT_MAX_TELEMETRY_BUFFER_CAPACITY,
+ currentValue == null ? DEFAULT_MAX_TELEMETRY_BUFFER_CAPACITY : currentValue);
+
+ return maxItemsInBatchEnforcer;
+ }
+
+ private LimitsEnforcer createDefaultSendIntervalInSecondsEnforcer(Integer currentValue) {
+ LimitsEnforcer sendIntervalInSecondsEnforcer = LimitsEnforcer.createWithClosestLimitOnError(
+ FLUSH_BUFFER_TIMEOUT_IN_SECONDS_NAME, MIN_FLUSH_BUFFER_TIMEOUT_IN_SECONDS,
+ MAX_FLUSH_BUFFER_TIMEOUT_IN_SECONDS, DEFAULT_FLUSH_BUFFER_TIMEOUT_IN_SECONDS,
+ currentValue == null ? DEFAULT_FLUSH_BUFFER_TIMEOUT_IN_SECONDS : currentValue);
+
+ return sendIntervalInSecondsEnforcer;
+ }
}
diff --git a/core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryChannelFactory.java b/core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryChannelFactory.java
index dfa01e53de5..4a687fcd72c 100644
--- a/core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryChannelFactory.java
+++ b/core/src/main/java/com/microsoft/applicationinsights/channel/concrete/inprocess/InProcessTelemetryChannelFactory.java
@@ -33,10 +33,18 @@
* Created by gupele on 1/15/2015.
*/
final class InProcessTelemetryChannelFactory implements TransmitterFactory {
+ private final int DEFAULT_RETRY = 3;
+ @Override
+ public TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled) {
+ return create(endpoint, maxTransmissionStorageCapacity, throttlingIsEnabled, DEFAULT_RETRY);
+ }
@Override
- public TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled) {
- final TransmissionPolicyManager transmissionPolicyManager = new TransmissionPolicyManager(throttlingIsEnabled);
-
+ public TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled, int maxInstantRetries) {
+ final TransmissionPolicyManager transmissionPolicyManager = new TransmissionPolicyManager(throttlingIsEnabled);
+ transmissionPolicyManager.addTransmissionHandler(new ErrorHandler(transmissionPolicyManager));
+ transmissionPolicyManager.addTransmissionHandler(new PartialSuccessHandler(transmissionPolicyManager));
+ transmissionPolicyManager.addTransmissionHandler(new ThrottlingHandler(transmissionPolicyManager));
+ transmissionPolicyManager.setMaxInstantRetries(maxInstantRetries);
// An active object with the network sender
TransmissionNetworkOutput actualNetworkSender = TransmissionNetworkOutput.create(endpoint, transmissionPolicyManager);
@@ -51,6 +59,7 @@ public TelemetriesTransmitter create(String endpoint, String maxTransmissionStor
// The dispatcher works with the two active senders
TransmissionDispatcher dispatcher = new NonBlockingDispatcher(new TransmissionOutput[] {networkSender, activeFileSystemOutput});
actualNetworkSender.setTransmissionDispatcher(dispatcher);
+
// The loader works with the file system loader as the active one does
TransmissionsLoader transmissionsLoader = new ActiveTransmissionLoader(fileSystemSender, stateFetcher, dispatcher);
diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandler.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandler.java
new file mode 100644
index 00000000000..957fa314c37
--- /dev/null
+++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandler.java
@@ -0,0 +1,17 @@
+package com.microsoft.applicationinsights.internal.channel;
+
+/**
+ * An interface that is used to create a concrete class that is called by the the {@link TransmissionHandlerObserver}
+ *
+ * This is used to implement classes like {@link ErrorHandler} and {@link PartialSuccessHandler}.
+ * @author jamdavi
+ *
+ *
+ */
+public interface TransmissionHandler {
+ /**
+ * Called when a transmission is sent by the {@link TransmissionOutput}.
+ * @param args The {@link TransmissionHandlerArgs} for this handler.
+ */
+ void onTransmissionSent(TransmissionHandlerArgs args);
+}
diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandlerArgs.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandlerArgs.java
new file mode 100644
index 00000000000..cb767d206af
--- /dev/null
+++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandlerArgs.java
@@ -0,0 +1,87 @@
+package com.microsoft.applicationinsights.internal.channel;
+
+import org.apache.http.Header;
+
+import com.microsoft.applicationinsights.internal.channel.common.Transmission;
+
+/**
+ * This class is used to store information between the transmission sender and the transmission handlers
+ *
+ * An example class that uses this are {@link ErrorHandler}
+ * @author jamdavi
+ *
+ */
+public class TransmissionHandlerArgs {
+ private String responseBody;
+ /**
+ * Set the response body.
+ * @param body The HTTP Response from the sender
+ */
+ public void setResponseBody(String body) { this.responseBody = body;}
+ /**
+ * Get the response body
+ * @return The HTTP Response from the sender
+ */
+ public String getResponseBody() { return this.responseBody;}
+
+
+ private TransmissionDispatcher transmissionDispatcher;
+ /**
+ * Set the {@link TransmissionDispatcher} used by the sender
+ * @param dispatcher The {@link TransmissionDispatcher} used by the sender
+ */
+ public void setTransmissionDispatcher(TransmissionDispatcher dispatcher) { this.transmissionDispatcher = dispatcher;}
+ /**
+ * Get the {@link TransmissionDispatcher} used by the sender
+ * @return The {@link TransmissionDispatcher} used by the sender
+ */
+ public TransmissionDispatcher getTransmissionDispatcher() { return this.transmissionDispatcher;}
+
+ private Transmission transmission;
+ /**
+ * Set the transmission that needs to be passed to the handler.
+ * @param transmission The transmission that needs to be passed to the handler.
+ */
+ public void setTransmission(Transmission transmission) { this.transmission = transmission;}
+ /**
+ * Get the transmission that needs to be passed to the handler.
+ * @return The transmission used by the handler.
+ */
+ public Transmission getTransmission() { return this.transmission;}
+
+ private int responseCode;
+ /**
+ * Set the response code to be passed to the handler.
+ * @param code The HTTP response code.
+ */
+ public void setResponseCode(int code) { this.responseCode = code;}
+ /**
+ * Get the response code for the handler to use.
+ * @return The HTTP response code.
+ */
+ public int getResponseCode() { return this.responseCode;}
+
+ private Throwable exception;
+ /**
+ * Set the exception thrown by the sender to be passed the handler.
+ * @param ex The exception
+ */
+ public void setException(Throwable ex) { this.exception = ex;}
+ /**
+ * Get the exception thrown by the sender to be used by the handler.
+ * @return The exception
+ */
+ public Throwable getException() { return this.exception;}
+
+ private Header retryHeader;
+ /**
+ * Set the Retry-After header to be passed to the handler.
+ * @param head The Retry-After header
+ */
+ public void setRetryHeader(Header head) { this.retryHeader = head;}
+ /**
+ * Get the Retry-After header to be passed to the handler.
+ * @return The Retry-After header
+ */
+ public Header getRetryHeader() { return this.retryHeader;}
+}
diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandlerObserver.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandlerObserver.java
new file mode 100644
index 00000000000..e4b4ba65449
--- /dev/null
+++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmissionHandlerObserver.java
@@ -0,0 +1,18 @@
+package com.microsoft.applicationinsights.internal.channel;
+
+
+/**
+ * Enables the {@link TransmissionPolicyManager} to handle transmission states.
+ *
+ * This interface extends {@TransmissionHandler} to add the ability to observe when the transmission is completed.
+ * @author jamdavi
+ *
+ */
+public interface TransmissionHandlerObserver extends TransmissionHandler {
+
+ /**
+ * Used to add a {@link TransmissionHandler} to the collection stored by the {@link TransmissionPolicyManager}
+ * @param handler The handler to add to the collection.
+ */
+ void addTransmissionHandler(TransmissionHandler handler);
+}
diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmitterFactory.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmitterFactory.java
index c18b8d16b0d..aa46694f96d 100644
--- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmitterFactory.java
+++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/TransmitterFactory.java
@@ -25,5 +25,21 @@
* Created by gupele on 12/21/2014.
*/
public interface TransmitterFactory {
+ /**
+ * Creates the {@link TelemetriesTransmitter} for use by the {@link TelemetryChannel}
+ * @param endpoint HTTP Endpoint to send telemetry to
+ * @param maxTransmissionStorageCapacity Max amount of disk space in KB for persistent storage to use
+ * @param throttlingIsEnabled Allow the network telemetry sender to be throttled
+ * @return The {@link TelemetriesTransmitter} object
+ */
TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled);
+ /**
+ * Creates the {@link TelemetriesTransmitter} for use by the {@link TelemetryChannel}
+ * @param endpoint HTTP Endpoint to send telemetry to
+ * @param maxTransmissionStorageCapacity Max amount of disk space in KB for persistent storage to use
+ * @param throttlingIsEnabled Allow the network telemetry sender to be throttled
+ * @param maxInstantRetries Number of instant retries in case of a temporary network outage
+ * @return The {@link TelemetriesTransmitter} object
+ */
+ TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled, int maxInstantRetries);
}
diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ActiveTransmissionLoader.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ActiveTransmissionLoader.java
index e149d851a8c..bcd936e6edc 100644
--- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ActiveTransmissionLoader.java
+++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ActiveTransmissionLoader.java
@@ -26,13 +26,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.base.Preconditions;
import com.microsoft.applicationinsights.internal.channel.TransmissionDispatcher;
import com.microsoft.applicationinsights.internal.channel.TransmissionsLoader;
import com.microsoft.applicationinsights.internal.logger.InternalLogger;
-import com.google.common.base.Preconditions;
-import com.microsoft.applicationinsights.internal.shutdown.Stoppable;
-
/**
* The class is responsible for loading transmission files that were saved to the disk
*
@@ -108,7 +106,7 @@ public void run() {
case UNBLOCKED:
fetchNext(true);
break;
-
+ case BACKOFF:
case BLOCKED_BUT_CAN_BE_PERSISTED:
Thread.sleep(DEFAULT_SLEEP_INTERVAL_AFTER_DISPATCHING_IN_MILLS);
break;
diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/BackendResponse.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/BackendResponse.java
new file mode 100644
index 00000000000..ac74ea10cad
--- /dev/null
+++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/BackendResponse.java
@@ -0,0 +1,20 @@
+package com.microsoft.applicationinsights.internal.channel.common;
+
+/**
+ * Utility class used by the {@link PartialSuccessHandler}
+ *
+ * @author jamdavi
+ *
+ */
+class BackendResponse {
+
+ int itemsReceived;
+ int itemsAccepted;
+ Error[] errors;
+
+ class Error {
+ public int index;
+ public int statusCode;
+ public String message;
+ }
+}
diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ErrorHandler.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ErrorHandler.java
new file mode 100644
index 00000000000..265e1266c78
--- /dev/null
+++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ErrorHandler.java
@@ -0,0 +1,70 @@
+package com.microsoft.applicationinsights.internal.channel.common;
+
+import com.microsoft.applicationinsights.internal.channel.TransmissionHandler;
+import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs;
+import com.microsoft.applicationinsights.internal.logger.InternalLogger;
+
+/**
+ * This class implements the retry logic for transmissions with the results of a
+ * 408, 500, and 503 result.
+ *
+ * It does not handle any error codes such as 400, 401, 403, 404, etc.
+ *
+ * @author jamdavi
+ *
+ */
+public class ErrorHandler implements TransmissionHandler {
+
+ private TransmissionPolicyManager transmissionPolicyManager;
+
+ /**
+ * Ctor
+ *
+ * Constructs the ErrorHandler object.
+ *
+ * @param policy
+ * The {@link TransmissionPolicyManager} object that is needed to
+ * control the back off policy
+ */
+ public ErrorHandler(TransmissionPolicyManager policy) {
+ this.transmissionPolicyManager = policy;
+ }
+
+ @Override
+ public void onTransmissionSent(TransmissionHandlerArgs args) {
+
+ validateTransmissionAndSend(args);
+ }
+
+ boolean validateTransmissionAndSend(TransmissionHandlerArgs args) {
+ if (args.getTransmission() != null && args.getTransmissionDispatcher() != null) {
+ args.getTransmission().incrementNumberOfSends();
+ switch (args.getResponseCode()) {
+ case TransmissionSendResult.REQUEST_TIMEOUT:
+ case TransmissionSendResult.INTERNAL_SERVER_ERROR:
+ case TransmissionSendResult.SERVICE_UNAVAILABLE:
+ backoffAndSendTransmission(args);
+ return true;
+ default:
+ InternalLogger.INSTANCE.trace("Http response code %s not handled by %s", args.getResponseCode(),
+ this.getClass().getName());
+ return false;
+ }
+ } else if (args.getException() != null) {
+ backoffAndSendTransmission(args);
+ return true;
+ }
+ return false;
+ }
+
+ private void backoffAndSendTransmission(TransmissionHandlerArgs args) {
+ // It is possible for us to have a temporary blip in transmission
+ // this setting will allow us to control how many instant retries we perform
+ // before backing off the send
+ if (args.getTransmission() != null && (args.getTransmission().getNumberOfSends() > transmissionPolicyManager.getMaxInstantRetries()))
+ {
+ this.transmissionPolicyManager.backoff();
+ }
+ args.getTransmissionDispatcher().dispatch(args.getTransmission());
+ }
+}
diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/PartialSuccessHandler.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/PartialSuccessHandler.java
new file mode 100644
index 00000000000..da1d7ae074a
--- /dev/null
+++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/PartialSuccessHandler.java
@@ -0,0 +1,192 @@
+package com.microsoft.applicationinsights.internal.channel.common;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.http.HttpStatus;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.common.base.Optional;
+import com.microsoft.applicationinsights.internal.channel.TransmissionHandler;
+import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs;
+import com.microsoft.applicationinsights.internal.logger.InternalLogger;
+
+/**
+ * This class implements the retry logic for partially accepted transmissions.
+ * HTTP status code 206.
+ *
+ *
+ * @see PartialSuccessTransmissionPolicy
+ * @author jamdavi
+ *
+ */
+public class PartialSuccessHandler implements TransmissionHandler {
+
+ /**
+ * Ctor
+ *
+ * Constructs the PartialSuccessHandler object.
+ *
+ * @param policy
+ * The {@link TransmissionPolicyManager} object that is needed to
+ * control the back off policy.
+ */
+ public PartialSuccessHandler(TransmissionPolicyManager policy) {
+ }
+
+ @Override
+ public void onTransmissionSent(TransmissionHandlerArgs args) {
+ validateTransmissionAndSend(args);
+ }
+
+ /**
+ * Provides the core logic for the retransmission
+ *
+ * @param args
+ * The {@link TransmissionHandlerArgs} for this transmission.
+ * @return Returns a pass/fail for handling this transmission.
+ */
+ boolean validateTransmissionAndSend(TransmissionHandlerArgs args) {
+ if (args.getTransmission() != null && args.getTransmissionDispatcher() != null) {
+ switch (args.getResponseCode()) {
+ case HttpStatus.SC_PARTIAL_CONTENT:
+ BackendResponse backendResponse = getBackendResponse(args.getResponseBody());
+ List originalItems = generateOriginalItems(args);
+
+ // Somehow the amount of items received and the items sent do not match
+ if (backendResponse != null && (originalItems.size() != backendResponse.itemsReceived)) {
+ InternalLogger.INSTANCE.trace(
+ "Skipping partial content handler due to itemsReceived being larger than the items sent.");
+ return false;
+ }
+
+ if (backendResponse != null && (backendResponse.itemsAccepted < backendResponse.itemsReceived)) {
+ List newTransmission = new ArrayList();
+ for (BackendResponse.Error e : backendResponse.errors) {
+ switch (e.statusCode) {
+ case TransmissionSendResult.REQUEST_TIMEOUT:
+ case TransmissionSendResult.INTERNAL_SERVER_ERROR:
+ case TransmissionSendResult.SERVICE_UNAVAILABLE:
+ case TransmissionSendResult.THROTTLED:
+ case TransmissionSendResult.THROTTLED_OVER_EXTENDED_TIME:
+ // Unknown condition where backend response returns an index greater than the
+ // items we're returning
+ if (e.index < originalItems.size()) {
+ newTransmission.add(originalItems.get(e.index));
+ }
+ break;
+ }
+ }
+ return sendNewTransmission(args, newTransmission);
+ }
+ InternalLogger.INSTANCE
+ .trace("Skipping partial content handler due to itemsAccepted and itemsReceived being equal.");
+ return false;
+
+ default:
+ InternalLogger.INSTANCE.trace("Http response code %s not handled by %s", args.getResponseCode(),
+ this.getClass().getName());
+ return false;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Used to parse the original telemetry request in order to resend the failed
+ * ones.
+ *
+ * @param args
+ * The {@link TransmissionHandlerArgs} that contains the
+ * {@link Transmission} object.
+ * @return A List<> of each sent item
+ */
+ List generateOriginalItems(TransmissionHandlerArgs args) {
+ List originalItems = new ArrayList();
+
+ if (args.getTransmission().getWebContentEncodingType() == "gzip") {
+
+ try {
+ GZIPInputStream gis = new GZIPInputStream(
+ new ByteArrayInputStream(args.getTransmission().getContent()));
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gis));
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ originalItems.add(line);
+ }
+ if (gis != null) {
+ gis.close();
+ }
+ if (bufferedReader != null) {
+ bufferedReader.close();
+ }
+ } catch (IOException e1) {
+ InternalLogger.INSTANCE.error("IOException: Error while reading the GZIP stream.%nStack Trace:%n%s",
+ ExceptionUtils.getStackTrace(e1));
+ } catch (Throwable t) {
+ InternalLogger.INSTANCE.error("Error while reading the GZIP stream.%nStack Trace:%n%s",
+ ExceptionUtils.getStackTrace(t));
+ } finally {
+ }
+ } else {
+ for (String s : new String(args.getTransmission().getContent()).split("\r\n")) {
+ originalItems.add(s);
+ }
+ }
+ return originalItems;
+ }
+
+ /**
+ * Sends a new transmission generated from the failed attempts from the original
+ * request.
+ *
+ * @param args
+ * The {@link TransmissionHandlerArgs} object that contains the
+ * {@link TransmissionDispatcher}
+ * @param newTransmission
+ * The {@link List} of items to resent
+ * @return A pass/fail response
+ */
+ boolean sendNewTransmission(TransmissionHandlerArgs args, List newTransmission) {
+ if (!newTransmission.isEmpty()) {
+ GzipTelemetrySerializer serializer = new GzipTelemetrySerializer();
+ Optional newT = serializer.serialize(newTransmission);
+ args.getTransmissionDispatcher().dispatch(newT.get());
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Helper method to parse the 206 response. Uses {@link Gson}
+ *
+ * @param response
+ * The body of the response.
+ * @return A {@link BackendResponse} object that contains the status of the
+ * partial success.
+ */
+ private BackendResponse getBackendResponse(String response) {
+
+ BackendResponse backend = null;
+ try {
+ // Parse JSON to Java
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ Gson gson = gsonBuilder.create();
+ backend = gson.fromJson(response, BackendResponse.class);
+ } catch (Throwable t) {
+ InternalLogger.INSTANCE.trace(
+ "Error deserializing backend response with Gson.%nStack Trace:%n%s",
+ ExceptionUtils.getStackTrace(t));
+ } finally {
+ }
+ return backend;
+ }
+}
diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/SenderThreadLocalBackOffData.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/SenderThreadLocalBackOffData.java
index 96c24497b13..3beda0bce01 100644
--- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/SenderThreadLocalBackOffData.java
+++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/SenderThreadLocalBackOffData.java
@@ -68,7 +68,7 @@ public SenderThreadLocalBackOffData(long[] backOffTimeoutsInMillis, long addMill
public boolean isTryingToSend() {
return currentBackOffIndex != -1;
}
-
+
/**
* This method should be called by the Sender thread when the
* Transmission is considered as 'done sending', which means the
@@ -92,34 +92,68 @@ public void onDoneSending() {
* 4. The thread has exhausted all the back-off timeouts
*/
public boolean backOff() {
- try {
- lock.lock();
- ++currentBackOffIndex;
- if (currentBackOffIndex == backOffTimeoutsInMillis.length) {
- currentBackOffIndex = -1;
-
- // Exhausted the back-offs
- return false;
- }
+ try {
+ lock.lock();
+ ++currentBackOffIndex;
+ if (currentBackOffIndex == backOffTimeoutsInMillis.length) {
+ currentBackOffIndex = -1;
- if (!instanceIsActive) {
+ // Exhausted the back-offs
return false;
}
- try {
- long millisecondsToWait = backOffTimeoutsInMillis[currentBackOffIndex];
- if (millisecondsToWait > BackOffTimesPolicy.MIN_TIME_TO_BACK_OFF_IN_MILLS) {
- millisecondsToWait += addMilliseconds;
- }
- backOffCondition.await(millisecondsToWait, TimeUnit.MILLISECONDS);
- return instanceIsActive;
- } catch (InterruptedException e) {
- return false;
+ if (!instanceIsActive) {
+ return false;
+ }
+
+ try {
+ long millisecondsToWait = backOffTimeoutsInMillis[currentBackOffIndex];
+ if (millisecondsToWait > BackOffTimesPolicy.MIN_TIME_TO_BACK_OFF_IN_MILLS) {
+ millisecondsToWait += addMilliseconds;
+ }
+ backOffCondition.await(millisecondsToWait, TimeUnit.MILLISECONDS);
+ return instanceIsActive;
+ } catch (InterruptedException e) {
+ return false;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Increment the current back off amount or resets the counter if needed.
+ *
+ * This method does not block but instead provides the amount of time to sleep which can be used
+ * in another method.
+ * @return The number of milliseconds to sleep for.
+ */
+ public long backOffTimerValue() {
+ try {
+ lock.lock();
+ ++currentBackOffIndex;
+ if (currentBackOffIndex == backOffTimeoutsInMillis.length) {
+ currentBackOffIndex = -1;
+
+ // Exhausted the back-offs
+ return -1;
}
- } finally {
- lock.unlock();
- }
- }
+
+ if (!instanceIsActive) {
+ return 0;
+ }
+
+
+ long millisecondsToWait = backOffTimeoutsInMillis[currentBackOffIndex];
+ if (millisecondsToWait > BackOffTimesPolicy.MIN_TIME_TO_BACK_OFF_IN_MILLS) {
+ millisecondsToWait += addMilliseconds;
+ }
+ return millisecondsToWait;
+
+ } finally {
+ lock.unlock();
+ }
+ }
/**
* Stop a waiting thread if there is one, and prevent that thread for backOffing.
diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/SenderThreadsBackOffManager.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/SenderThreadsBackOffManager.java
index b13eb614606..a8e758b23bb 100644
--- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/SenderThreadsBackOffManager.java
+++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/SenderThreadsBackOffManager.java
@@ -63,6 +63,11 @@ public void onDoneSending() {
SenderThreadLocalBackOffData currentThreadData = this.get();
currentThreadData.onDoneSending();
}
+
+ public long backOffCurrentSenderThreadValue() {
+ SenderThreadLocalBackOffData currentThreadData = this.get();
+ return currentThreadData.backOffTimerValue();
+ }
public boolean backOffCurrentSenderThread() {
SenderThreadLocalBackOffData currentThreadData = this.get();
@@ -75,7 +80,7 @@ public synchronized void stopAllSendersBackOffActivities() {
}
stopped = true;
}
-
+
@Override
protected SenderThreadLocalBackOffData initialValue() {
int addSeconds = threadsSecondsDifference.incrementAndGet();
diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ThrottlingHandler.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ThrottlingHandler.java
new file mode 100644
index 00000000000..4af52a641d8
--- /dev/null
+++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/ThrottlingHandler.java
@@ -0,0 +1,135 @@
+package com.microsoft.applicationinsights.internal.channel.common;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.TimeZone;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.http.Header;
+
+import com.google.common.base.Strings;
+import com.microsoft.applicationinsights.internal.channel.TransmissionHandler;
+import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs;
+import com.microsoft.applicationinsights.internal.logger.InternalLogger;
+
+/**
+ * This class implements the retry logic for throttled requests. HTTP status
+ * code 429 and 439.
+ *
+ * @author jamdavi
+ *
+ */
+public class ThrottlingHandler implements TransmissionHandler {
+
+ private TransmissionPolicyManager transmissionPolicyManager;
+ private final static String RESPONSE_RETRY_AFTER_DATE_FORMAT = "E, dd MMM yyyy HH:mm:ss";
+
+ /**
+ * Ctor
+ *
+ * Constructs the ThrottlingHandler object.
+ *
+ * @param policy
+ * The {@link TransmissionPolicyManager} object that is needed to
+ * control the back off policy.
+ */
+ public ThrottlingHandler(TransmissionPolicyManager policy) {
+ this.transmissionPolicyManager = policy;
+ }
+
+ @Override
+ public void onTransmissionSent(TransmissionHandlerArgs args) {
+ validateTransmissionAndSend(args);
+ }
+
+ /**
+ * Provides the core logic for the retransmission
+ *
+ * @param args
+ * The {@link TransmissionHandlerArgs} for this transmission.
+ * @return Returns a pass/fail for handling this transmission.
+ */
+ boolean validateTransmissionAndSend(TransmissionHandlerArgs args) {
+ if (args.getRetryHeader() != null && args.getTransmission() != null
+ && args.getTransmissionDispatcher() != null) {
+ args.getTransmission().incrementNumberOfSends();
+ switch (args.getResponseCode()) {
+ case TransmissionSendResult.THROTTLED:
+ case TransmissionSendResult.THROTTLED_OVER_EXTENDED_TIME:
+ suspendTransmissions(TransmissionPolicy.BLOCKED_BUT_CAN_BE_PERSISTED, args.getRetryHeader());
+ args.getTransmissionDispatcher().dispatch(args.getTransmission());
+ return true;
+ default:
+ InternalLogger.INSTANCE.trace("Http response code %s not handled by %s", args.getResponseCode(),
+ this.getClass().getName());
+ return false;
+ }
+ }
+ InternalLogger.INSTANCE.trace("Http response code %s not handled by %s.", args.getResponseCode(),
+ this.getClass().getName());
+ return false;
+ }
+
+ /**
+ * Used to put the sender thread to sleep for the specified duration in the
+ * Retry-After header.
+ *
+ * @param suspensionPolicy
+ * The policy used to suspend the threads. For now we use
+ * {@link TransmissionPolicy.BLOCKED_BUT_CAN_BE_PERSISTED} for reuse
+ * of the existing logic.
+ * @param retryAfterHeader
+ * The header that is captured from the HTTP response.
+ */
+ private void suspendTransmissions(TransmissionPolicy suspensionPolicy, Header retryAfterHeader) {
+
+ if (retryAfterHeader == null) {
+ return;
+ }
+ String retryAfterAsString = retryAfterHeader.getValue();
+ if (Strings.isNullOrEmpty(retryAfterAsString)) {
+ return;
+ }
+
+ try {
+ DateFormat formatter = new SimpleDateFormat(RESPONSE_RETRY_AFTER_DATE_FORMAT);
+ Date date = formatter.parse(retryAfterAsString);
+
+ Date now = Calendar.getInstance().getTime();
+ long retryAfterAsSeconds = (date.getTime() - convertToDateToGmt(now).getTime()) / 1000;
+ this.transmissionPolicyManager.suspendInSeconds(suspensionPolicy, retryAfterAsSeconds);
+ } catch (Throwable e) {
+ InternalLogger.INSTANCE.error("Throttled but failed to block transmission.%nStack Trace:%n%s",
+ ExceptionUtils.getStackTrace(e));
+ this.transmissionPolicyManager.backoff();
+ }
+
+ }
+
+ /**
+ * Converts parsed date value to GMT for the {@link suspendTransmissions}
+ * method.
+ *
+ * @param date
+ * The date to convert to GMT
+ * @return The corrected date.
+ */
+ private static Date convertToDateToGmt(Date date) {
+ TimeZone tz = TimeZone.getDefault();
+ Date ret = new Date(date.getTime() - tz.getRawOffset());
+
+ // If we are now in DST, back off by the delta. Note that we are checking the
+ // GMT date, this is the KEY.
+ if (tz.inDaylightTime(ret)) {
+ Date dstDate = new Date(ret.getTime() - tz.getDSTSavings());
+
+ // Check to make sure we have not crossed back into standard time
+ if (tz.inDaylightTime(dstDate)) {
+ ret = dstDate;
+ }
+ }
+ return ret;
+ }
+}
diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionFileSystemOutput.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionFileSystemOutput.java
index f6da9b9bd0e..e113a2988bd 100644
--- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionFileSystemOutput.java
+++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionFileSystemOutput.java
@@ -78,7 +78,7 @@ public final class TransmissionFileSystemOutput implements TransmissionOutput {
private final static int DELETE_TIMEOUT_ON_FAILURE_IN_MILLS = 100;
private final static int DEFAULT_CAPACITY_MEGABYTES = 10;
- private final static int MAX_CAPACITY_MEGABYTES = 100;
+ private final static int MAX_CAPACITY_MEGABYTES = 1000;
private final static int MIN_CAPACITY_MEGABYTES = 1;
private static final String MAX_TRANSMISSION_STORAGE_CAPACITY_NAME = "Channel.MaxTransmissionStorageCapacityInMB";
@@ -135,6 +135,7 @@ public TransmissionFileSystemOutput(String folderPath) {
@Override
public boolean send(Transmission transmission) {
if (size.get() >= capacityInKB) {
+ InternalLogger.INSTANCE.logAlways(InternalLogger.LoggingLevel.WARN, "Persistent storage max capcity has been reached; currently at %s KB. Telemetry will be lost, please set the MaxTransmissionStorageFilesCapacityInMB property in the configuration file.", size.get());
return false;
}
@@ -151,6 +152,7 @@ public boolean send(Transmission transmission) {
return false;
}
+ InternalLogger.INSTANCE.logAlways(InternalLogger.LoggingLevel.TRACE, "Persistent storage saved permanent file; data will be persisted and sent when the network is available.");
return true;
}
diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionNetworkOutput.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionNetworkOutput.java
index 2247c6356bf..8f7bd26028d 100644
--- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionNetworkOutput.java
+++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionNetworkOutput.java
@@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.microsoft.applicationinsights.internal.channel.TransmissionDispatcher;
+import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs;
import com.microsoft.applicationinsights.internal.channel.TransmissionOutput;
import com.microsoft.applicationinsights.internal.logger.InternalLogger;
import org.apache.http.Header;
@@ -33,294 +34,241 @@
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.util.EntityUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
import java.net.SocketException;
import java.net.UnknownHostException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
/**
- * The class is responsible for the actual sending of {@link com.microsoft.applicationinsights.internal.channel.common.Transmission}
+ * The class is responsible for the actual sending of
+ * {@link com.microsoft.applicationinsights.internal.channel.common.Transmission}
*
* The class uses Apache's HttpClient framework for that.
*
* Created by gupele on 12/18/2014.
*/
public final class TransmissionNetworkOutput implements TransmissionOutput {
- private final static String CONTENT_TYPE_HEADER = "Content-Type";
- private final static String CONTENT_ENCODING_HEADER = "Content-Encoding";
- private final static String RESPONSE_THROTTLING_HEADER = "Retry-After";
- private final static String RESPONSE_RETRY_AFTER_DATE_FORMAT = "E, dd MMM yyyy HH:mm:ss";
+ private static final int MAX_RESEND = 3;
+ private final static String CONTENT_TYPE_HEADER = "Content-Type";
+ private final static String CONTENT_ENCODING_HEADER = "Content-Encoding";
+ private final static String RESPONSE_THROTTLING_HEADER = "Retry-After";
+
+ private final static String DEFAULT_SERVER_URI = "https://dc.services.visualstudio.com/v2/track";
+
+ // For future use: re-send a failed transmission back to the dispatcher
+ private TransmissionDispatcher transmissionDispatcher;
+
+ private final String serverUri;
+
+ private volatile boolean stopped;
+
+ // Use one instance for optimization
+ private final ApacheSender httpClient;
+
+ private TransmissionPolicyManager transmissionPolicyManager;
+
+ /**
+ * Creates an instance of the network transmission class.
+ *
+ * Will use the DEFAULT_SERVER_URI for the endpoint.
+ *
+ * @param transmissionPolicyManager
+ * The transmission policy used to mark this sender active or
+ * blocked.
+ * @return
+ */
+ public static TransmissionNetworkOutput create(TransmissionPolicyManager transmissionPolicyManager) {
+ return create(DEFAULT_SERVER_URI, transmissionPolicyManager);
+ }
+
+ /**
+ * Creates an instance of the network transmission class.
+ *
+ * @param endpoint
+ * The HTTP endpoint to send our telemetry too.
+ * @param transmissionPolicyManager
+ * The transmission policy used to mark this sender active or
+ * blocked.
+ * @return
+ */
+ public static TransmissionNetworkOutput create(String endpoint,
+ TransmissionPolicyManager transmissionPolicyManager) {
+ String realEndpoint = Strings.isNullOrEmpty(endpoint) ? DEFAULT_SERVER_URI : endpoint;
+ return new TransmissionNetworkOutput(realEndpoint, transmissionPolicyManager);
+ }
+
+ /**
+ * Private Ctor to initialize class.
+ *
+ * Also creates the httpClient using the ApacheSender instance
+ *
+ * @param serverUri
+ * The HTTP endpoint to send our telemetry too.
+ * @param transmissionPolicyManager
+ */
+ private TransmissionNetworkOutput(String serverUri, TransmissionPolicyManager transmissionPolicyManager) {
+ Preconditions.checkNotNull(serverUri, "serverUri should be a valid non-null value");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serverUri), "serverUri should be a valid non-null value");
+ Preconditions.checkNotNull(transmissionPolicyManager,
+ "transmissionPolicyManager should be a valid non-null value");
+
+ this.serverUri = serverUri;
+
+ httpClient = ApacheSenderFactory.INSTANCE.create();
+ this.transmissionPolicyManager = transmissionPolicyManager;
+ stopped = false;
+
+ }
+
+ /**
+ * Used to inject the dispatcher used for this output so it can be injected to
+ * the retry logic.
+ *
+ * @param transmissionDispatcher
+ * The dispatcher to be injected.
+ */
+ public void setTransmissionDispatcher(TransmissionDispatcher transmissionDispatcher) {
+ this.transmissionDispatcher = transmissionDispatcher;
+ }
+
+ /**
+ * Stops all threads from sending data.
+ *
+ * @param timeout
+ * The timeout to wait, which is not relevant here.
+ * @param timeUnit
+ * The time unit, which is not relevant in this method.
+ */
+ @Override
+ public synchronized void stop(long timeout, TimeUnit timeUnit) {
+ if (stopped) {
+ return;
+ }
+
+ httpClient.close();
+ stopped = true;
+ }
+
+ /**
+ * Tries to send a
+ * {@link com.microsoft.applicationinsights.internal.channel.common.Transmission}
+ * The thread that calls that method might be suspended if there is a throttling
+ * issues, in any case the thread that enters this method is responsive for
+ * 'stop' request that might be issued by the application.
+ *
+ * @param transmission
+ * The data to send
+ * @return True when done.
+ */
+ @Override
+ public boolean send(Transmission transmission) {
+ if (!stopped) {
+ // If we're not stopped but in a blocked state then fail to second
+ // TransmissionOutput
+ if (transmissionPolicyManager.getTransmissionPolicyState()
+ .getCurrentState() != TransmissionPolicy.UNBLOCKED) {
+ return false;
+ }
+
+ HttpResponse response = null;
+ HttpPost request = null;
+ int code = 0;
+ String respString = null;
+ Throwable ex = null;
+ Header retryAfterHeader = null;
+ try {
+ // POST the transmission data to the endpoint
+ request = createTransmissionPostRequest(transmission);
+ httpClient.enhanceRequest(request);
+ response = httpClient.sendPostRequest(request);
+ HttpEntity respEntity = response.getEntity();
+ code = response.getStatusLine().getStatusCode();
+ respString = EntityUtils.toString(respEntity);
+ retryAfterHeader = response.getFirstHeader(RESPONSE_THROTTLING_HEADER);
+
+ // After the third time through this dispatcher we should reset the counter and
+ // then fail to second TransmissionOutput
+ if (code > HttpStatus.SC_PARTIAL_CONTENT && transmission.getNumberOfSends() >= MAX_RESEND) {
+ transmission.setNumberOfSends(0);
+ return false;
+ } else if (code == HttpStatus.SC_OK) {
+ // If we've completed then clear the back off flags as the channel does not need
+ // to be throttled
+ transmissionPolicyManager.clearBackoff();
+ }
+ return true;
+
+ } catch (ConnectionPoolTimeoutException e) {
+ ex = e;
+ InternalLogger.INSTANCE.error("Failed to send, connection pool timeout exception%nStack Trace:%n%s",
+ ExceptionUtils.getStackTrace(e));
+ } catch (SocketException e) {
+ ex = e;
+ InternalLogger.INSTANCE.error("Failed to send, socket exception.%nStack Trace:%n%s",
+ ExceptionUtils.getStackTrace(e));
+ } catch (UnknownHostException e) {
+ ex = e;
+ InternalLogger.INSTANCE.error(
+ "Failed to send, wrong host address or cannot reach address due to network issues.%nStack Trace:%n%s",
+ ExceptionUtils.getStackTrace(e));
+ } catch (IOException ioe) {
+ ex = ioe;
+ InternalLogger.INSTANCE.error("Failed to send.%nStack Trace:%n%s", ExceptionUtils.getStackTrace(ioe));
+ } catch (Exception e) {
+ ex = e;
+ InternalLogger.INSTANCE.error("Failed to send, unexpected exception.%nStack Trace:%n%s",
+ ExceptionUtils.getStackTrace(e));
+ } catch (Throwable t) {
+ ex = t;
+ InternalLogger.INSTANCE.error("Failed to send, unexpected error.%nStack Trace:%n%s",
+ ExceptionUtils.getStackTrace(t));
+ } finally {
+ if (request != null) {
+ request.releaseConnection();
+ }
+ httpClient.dispose(response);
+
+ if (code != HttpStatus.SC_OK && transmission.getNumberOfSends() < MAX_RESEND) {
+ // Invoke the listeners for handling things like errors
+ // The listeners will handle the back off logic as well as the dispatch
+ // operation
+ TransmissionHandlerArgs args = new TransmissionHandlerArgs();
+ args.setTransmission(transmission);
+ args.setTransmissionDispatcher(transmissionDispatcher);
+ args.setResponseBody(respString);
+ args.setResponseCode(code);
+ args.setException(ex);
+ args.setRetryHeader(retryAfterHeader);
+ this.transmissionPolicyManager.onTransmissionSent(args);
+ }
+ }
+ }
+ // If we end up here we've hit an error code we do not expect (403, 401, 400,
+ // etc.)
+ // This also means that unless there is a TransmissionHandler for this code we
+ // will not retry.
+ return true;
+ }
+
+ /**
+ * Generates the HTTP POST to send to the endpoint.
+ *
+ * @param transmission
+ * The transmission to send.
+ * @return The completed {@link HttpPost} object
+ */
+ private HttpPost createTransmissionPostRequest(Transmission transmission) {
+ HttpPost request = new HttpPost(serverUri);
+ request.addHeader(CONTENT_TYPE_HEADER, transmission.getWebContentType());
+ request.addHeader(CONTENT_ENCODING_HEADER, transmission.getWebContentEncodingType());
+
+ ByteArrayEntity bae = new ByteArrayEntity(transmission.getContent());
+ request.setEntity(bae);
+
+ return request;
+ }
- private final static String DEFAULT_SERVER_URI = "https://dc.services.visualstudio.com/v2/track";
- private final static int DEFAULT_BACKOFF_TIME_SECONDS = 300;
-
- // For future use: re-send a failed transmission back to the dispatcher
- private TransmissionDispatcher transmissionDispatcher;
-
- private final String serverUri;
-
- private volatile boolean stopped;
-
- // Use one instance for optimization
- private final ApacheSender httpClient;
-
- private TransmissionPolicyManager transmissionPolicyManager;
-
- public static TransmissionNetworkOutput create(TransmissionPolicyManager transmissionPolicyManager) {
- return create(DEFAULT_SERVER_URI, transmissionPolicyManager);
- }
-
- public static TransmissionNetworkOutput create(String endpoint, TransmissionPolicyManager transmissionPolicyManager) {
- String realEndpoint = Strings.isNullOrEmpty(endpoint) ? DEFAULT_SERVER_URI : endpoint;
- return new TransmissionNetworkOutput(realEndpoint, transmissionPolicyManager);
- }
-
- private TransmissionNetworkOutput(String serverUri, TransmissionPolicyManager transmissionPolicyManager) {
- Preconditions.checkNotNull(serverUri, "serverUri should be a valid non-null value");
- Preconditions.checkArgument(!Strings.isNullOrEmpty(serverUri), "serverUri should be a valid non-null value");
- Preconditions.checkNotNull(transmissionPolicyManager, "transmissionPolicyManager should be a valid non-null value");
-
- this.serverUri = serverUri;
-
- httpClient = ApacheSenderFactory.INSTANCE.create();
- this.transmissionPolicyManager = transmissionPolicyManager;
- stopped = false;
- }
-
- public void setTransmissionDispatcher(TransmissionDispatcher transmissionDispatcher) {
- this.transmissionDispatcher = transmissionDispatcher;
- }
-
- /**
- * Stops all threads from sending data.
- * @param timeout The timeout to wait, which is not relevant here.
- * @param timeUnit The time unit, which is not relevant in this method.
- */
- @Override
- public synchronized void stop(long timeout, TimeUnit timeUnit) {
- if (stopped) {
- return;
- }
-
- httpClient.close();
- stopped = true;
- }
-
- /**
- * Tries to send a {@link com.microsoft.applicationinsights.internal.channel.common.Transmission}
- * The thread that calls that method might be suspended if there is a throttling issues, in any case
- * the thread that enters this method is responsive for 'stop' request that might be issued by the application.
- * @param transmission The data to send
- * @return True when done.
- */
- @Override
- public boolean send(Transmission transmission) {
- while (!stopped) {
- if (transmissionPolicyManager.getTransmissionPolicyState().getCurrentState() != TransmissionPolicy.UNBLOCKED) {
- return false;
- }
-
- HttpResponse response = null;
- HttpPost request = null;
- boolean shouldBackoff = false;
- try {
- request = createTransmissionPostRequest(transmission);
- httpClient.enhanceRequest(request);
-
- response = httpClient.sendPostRequest(request);
-
- HttpEntity respEntity = response.getEntity();
- int code = response.getStatusLine().getStatusCode();
-
- TransmissionSendResult sendResult = translateResponse(code, respEntity);
- switch (sendResult) {
- case PAYMENT_REQUIRED:
- case THROTTLED:
- suspendTransmissions(TransmissionPolicy.BLOCKED_BUT_CAN_BE_PERSISTED, response);
- break;
-
- case THROTTLED_OVER_EXTENDED_TIME:
- suspendTransmissions(TransmissionPolicy.BLOCKED_AND_CANNOT_BE_PERSISTED, response);
- break;
-
- default:
- return true;
- }
- } catch (ConnectionPoolTimeoutException e) {
- InternalLogger.INSTANCE.error("Failed to send, connection pool timeout exception");
- shouldBackoff = true;
- } catch (SocketException e) {
- InternalLogger.INSTANCE.error("Failed to send, socket timeout exception");
- shouldBackoff = true;
- } catch (UnknownHostException e) {
- InternalLogger.INSTANCE.error("Failed to send, wrong host address or cannot reach address due to network issues, exception: %s", e.getMessage());
- shouldBackoff = true;
- } catch (IOException ioe) {
- InternalLogger.INSTANCE.error("Failed to send, exception: %s", ioe.getMessage());
- shouldBackoff = true;
- } catch (Exception e) {
- InternalLogger.INSTANCE.error("Failed to send, unexpected exception: %s", e.getMessage());
- shouldBackoff = true;
- } catch (Throwable t) {
- InternalLogger.INSTANCE.error("Failed to send, unexpected error: %s", t.getMessage());
- shouldBackoff = true;
- }
- finally {
- if (request != null) {
- request.releaseConnection();
- }
- httpClient.dispose(response);
- // backoff before trying again
- if (shouldBackoff) {
- InternalLogger.INSTANCE.trace("Backing off for %s seconds", DEFAULT_BACKOFF_TIME_SECONDS);
- transmissionPolicyManager.suspendInSeconds(TransmissionPolicy.BLOCKED_BUT_CAN_BE_PERSISTED, DEFAULT_BACKOFF_TIME_SECONDS);
- }
- }
- }
-
- return true;
- }
-
- private void suspendTransmissions(TransmissionPolicy suspensionPolicy, HttpResponse response) {
- Header retryAfterHeader = response.getFirstHeader(RESPONSE_THROTTLING_HEADER);
- if (retryAfterHeader == null) {
- return;
- }
-
- String retryAfterAsString = retryAfterHeader.getValue();
- if (Strings.isNullOrEmpty(retryAfterAsString)) {
- return;
- }
-
- try {
- DateFormat formatter = new SimpleDateFormat(RESPONSE_RETRY_AFTER_DATE_FORMAT);
- Date date = formatter.parse(retryAfterAsString);
-
- Date now = Calendar.getInstance().getTime();
- long retryAfterAsSeconds = (date.getTime() - convertToDateToGmt(now).getTime())/1000;
- transmissionPolicyManager.suspendInSeconds(suspensionPolicy, retryAfterAsSeconds);
- } catch (Throwable e) {
- InternalLogger.INSTANCE.logAlways(InternalLogger.LoggingLevel.ERROR, "Throttled but failed to block transmission, exception: %s", e.getMessage());
- }
- }
-
- private static Date convertToDateToGmt(Date date){
- TimeZone tz = TimeZone.getDefault();
- Date ret = new Date(date.getTime() - tz.getRawOffset());
-
- // If we are now in DST, back off by the delta. Note that we are checking the GMT date, this is the KEY.
- if (tz.inDaylightTime(ret)) {
- Date dstDate = new Date(ret.getTime() - tz.getDSTSavings());
-
- // Check to make sure we have not crossed back into standard time
- if (tz.inDaylightTime(dstDate)) {
- ret = dstDate;
- }
- }
- return ret;
- }
-
- private TransmissionSendResult translateResponse(int code, HttpEntity respEntity) {
- if (code == HttpStatus.SC_OK) {
- return TransmissionSendResult.SENT_SUCCESSFULLY;
- }
-
- TransmissionSendResult result;
-
- String errorMessage;
- if (code < HttpStatus.SC_OK ||
- (code >= HttpStatus.SC_MULTIPLE_CHOICES && code < HttpStatus.SC_BAD_REQUEST) ||
- code > HttpStatus.SC_INTERNAL_SERVER_ERROR) {
-
- errorMessage = String.format("Unexpected response code: %d", code);
- result = TransmissionSendResult.REJECTED_BY_SERVER;
- } else {
- switch (code) {
- case HttpStatus.SC_BAD_REQUEST:
- errorMessage = "Bad request ";
- result = TransmissionSendResult.BAD_REQUEST;
- break;
-
- case 429:
- result = TransmissionSendResult.THROTTLED;
- errorMessage = "Throttling (All messages of the transmission were rejected) ";
- break;
-
- case 439:
- result = TransmissionSendResult.THROTTLED_OVER_EXTENDED_TIME;
- errorMessage = "Throttling extended";
- break;
-
- case 402:
- result = TransmissionSendResult.PAYMENT_REQUIRED;
- errorMessage = "Throttling: payment required";
- break;
-
- case HttpStatus.SC_PARTIAL_CONTENT:
- result = TransmissionSendResult.PARTIALLY_THROTTLED;
- errorMessage = "Throttling (Partial messages of the transmission were rejected) ";
- break;
-
- case HttpStatus.SC_INTERNAL_SERVER_ERROR:
- errorMessage = "Internal server error ";
- result = TransmissionSendResult.INTERNAL_SERVER_ERROR;
- break;
-
- default:
- result = TransmissionSendResult.REJECTED_BY_SERVER;
- errorMessage = String.format("Error, response code: %d", code);
- break;
- }
- }
-
- logError(errorMessage, respEntity);
- return result;
- }
-
- private void logError(String baseErrorMessage, HttpEntity respEntity) {
- if (respEntity == null || !InternalLogger.INSTANCE.isErrorEnabled()) {
- InternalLogger.INSTANCE.error(baseErrorMessage);
- return;
- }
-
- InputStream inputStream = null;
- try {
- inputStream = respEntity.getContent();
- InputStreamReader streamReader = new InputStreamReader(inputStream, "UTF-8");
- BufferedReader reader = new BufferedReader(streamReader);
- String responseLine = reader.readLine();
- respEntity.getContent().close();
-
- InternalLogger.INSTANCE.error("Failed to send, %s : %s", baseErrorMessage, responseLine);
- } catch (IOException e) {
- InternalLogger.INSTANCE.error("Failed to send, %s, failed to log the error", baseErrorMessage);
- } finally {
- if (inputStream != null) {
- try {
- inputStream.close();
- } catch (IOException e) {
- }
- }
- }
- }
-
- private HttpPost createTransmissionPostRequest(Transmission transmission) {
- HttpPost request = new HttpPost(serverUri);
- request.addHeader(CONTENT_TYPE_HEADER, transmission.getWebContentType());
- request.addHeader(CONTENT_ENCODING_HEADER, transmission.getWebContentEncodingType());
-
- ByteArrayEntity bae = new ByteArrayEntity(transmission.getContent());
- request.setEntity(bae);
-
- return request;
- }
}
diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionPolicy.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionPolicy.java
index b655e933345..6e31644a139 100644
--- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionPolicy.java
+++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionPolicy.java
@@ -27,5 +27,6 @@
enum TransmissionPolicy {
UNBLOCKED,
BLOCKED_BUT_CAN_BE_PERSISTED,
- BLOCKED_AND_CANNOT_BE_PERSISTED
+ BLOCKED_AND_CANNOT_BE_PERSISTED,
+ BACKOFF
}
diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionPolicyManager.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionPolicyManager.java
index 473e6ebe872..8f847fad330 100644
--- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionPolicyManager.java
+++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionPolicyManager.java
@@ -21,20 +21,24 @@
package com.microsoft.applicationinsights.internal.channel.common;
+import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
+import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Preconditions;
+import com.microsoft.applicationinsights.internal.channel.TransmissionHandler;
+import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs;
+import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerObserver;
import com.microsoft.applicationinsights.internal.logger.InternalLogger;
import com.microsoft.applicationinsights.internal.shutdown.SDKShutdownActivity;
import com.microsoft.applicationinsights.internal.shutdown.Stoppable;
import com.microsoft.applicationinsights.internal.util.ThreadPoolUtils;
-import com.google.common.base.Preconditions;
-
/**
* This class is responsible for managing the transmission state.
*
@@ -46,8 +50,17 @@
*
* Created by gupele on 6/29/2015.
*/
-public final class TransmissionPolicyManager implements Stoppable {
-
+public final class TransmissionPolicyManager implements Stoppable, TransmissionHandlerObserver {
+
+ private int INSTANT_RETRY_AMOUNT = 3; // Should always be set by the creator of this class
+ private int INSTANT_RETRY_MAX = 10; // Stops us from getting into an endless loop
+
+ // Current thread backoff manager
+ private SenderThreadsBackOffManager backoffManager;
+
+ // List of transmission policies implemented as handlers
+ private List transmissionHandlers;
+
// The future date the the transmission is blocked
private Date suspensionDate;
@@ -80,11 +93,45 @@ public void run() {
}
}
+ /**
+ * Create the {@link TransmissionPolicyManager} and set the ability to throttle.
+ * @param throttlingIsEnabled Set whether the {@link TransmissionPolicyManager} can be throttled.
+ */
public TransmissionPolicyManager(boolean throttlingIsEnabled) {
suspensionDate = null;
this.throttlingIsEnabled = throttlingIsEnabled;
+ this.transmissionHandlers = new ArrayList();
+ this.backoffManager = new SenderThreadsBackOffManager(new ExponentialBackOffTimesPolicy());
}
+ /**
+ * Suspend the transmission thread according to the current back off policy.
+ */
+ public void backoff() {
+ policyState.setCurrentState(TransmissionPolicy.BACKOFF);
+ long backOffMillis = backoffManager.backOffCurrentSenderThreadValue();
+ if (backOffMillis > 0)
+ {
+ long backOffSeconds = backOffMillis / 1000;
+ InternalLogger.INSTANCE.logAlways(InternalLogger.LoggingLevel.TRACE, "App is throttled, telemetry will be blocked for %s seconds.", backOffSeconds);
+ this.suspendInSeconds(TransmissionPolicy.BACKOFF, backOffSeconds);
+ }
+ }
+
+ /**
+ * Clear the current thread state and and reset the back off counter.
+ */
+ public void clearBackoff() {
+ policyState.setCurrentState(TransmissionPolicy.UNBLOCKED);
+ backoffManager.onDoneSending();
+ InternalLogger.INSTANCE.logAlways(InternalLogger.LoggingLevel.TRACE, "Backoff has been reset.");
+ }
+
+ /**
+ * Suspend this transmission thread using the specified policy
+ * @param policy The {@link TransmissionPolicy} to use for suspension
+ * @param suspendInSeconds The number of seconds to suspend.
+ */
public void suspendInSeconds(TransmissionPolicy policy, long suspendInSeconds) {
if (!throttlingIsEnabled) {
return;
@@ -97,11 +144,18 @@ public void suspendInSeconds(TransmissionPolicy policy, long suspendInSeconds) {
doSuspend(policy, suspendInSeconds);
}
+ /**
+ * Stop this transmission thread from sending.
+ */
@Override
public synchronized void stop(long timeout, TimeUnit timeUnit) {
ThreadPoolUtils.stop(threads, timeout, timeUnit);
}
+ /**
+ * Get the policy state fetcher
+ * @return A {@link TransmissionPolicyStateFetcher} object
+ */
public TransmissionPolicyStateFetcher getTransmissionPolicyState() {
return policyState;
}
@@ -111,7 +165,7 @@ private synchronized void doSuspend(TransmissionPolicy policy, long suspendInSec
if (policy == TransmissionPolicy.UNBLOCKED ) {
return;
}
-
+
Date date = Calendar.getInstance().getTime();
date.setTime(date.getTime() + 1000 * suspendInSeconds);
if (this.suspensionDate != null) {
@@ -160,4 +214,36 @@ public Thread newThread(Runnable r) {
SDKShutdownActivity.INSTANCE.register(this);
}
+
+ @Override
+ public void onTransmissionSent(TransmissionHandlerArgs transmissionArgs) {
+ for (TransmissionHandler handler : this.transmissionHandlers) {
+ handler.onTransmissionSent(transmissionArgs);
+ }
+ }
+
+ @Override
+ public void addTransmissionHandler(TransmissionHandler handler) {
+ if(handler != null) {
+ this.transmissionHandlers.add(handler);
+ }
+ }
+
+ /**
+ * Set the number of retries before performing a back off operation.
+ * @param maxInstantRetries Number of retries
+ */
+ public void setMaxInstantRetries(int maxInstantRetries) {
+ if (maxInstantRetries >= 0 && maxInstantRetries < INSTANT_RETRY_MAX) {
+ INSTANT_RETRY_AMOUNT = maxInstantRetries;
+ }
+ }
+
+ /**
+ * Get the number of retries before performing a back off operation.
+ * @return Number of retries
+ */
+ public int getMaxInstantRetries() {
+ return INSTANT_RETRY_AMOUNT;
+ }
}
diff --git a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionSendResult.java b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionSendResult.java
index 6a66f8dc755..f87e5595a4a 100644
--- a/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionSendResult.java
+++ b/core/src/main/java/com/microsoft/applicationinsights/internal/channel/common/TransmissionSendResult.java
@@ -24,26 +24,12 @@
/**
* Created by gupele on 2/10/2015.
*/
-enum TransmissionSendResult {
- SENT_SUCCESSFULLY,
-
- FAILED_TO_SEND_DUE_TO_NETWORK_ISSUES,
- FAILED_TO_SEND_DUE_TO_CONNECTION_POOL,
-
- FAILED_TO_RECEIVE_DUE_TO_TIMEOUT,
-
- FAILED_TO_READ_RESPONSE,
-
- // Got response
- BAD_REQUEST,
- INTERNAL_SERVER_ERROR,
-
- THROTTLED,
- THROTTLED_OVER_EXTENDED_TIME,
- PAYMENT_REQUIRED,
-
- PARTIALLY_THROTTLED,
-
- REJECTED_BY_SERVER,
- UNKNOWN_ERROR,
+final class TransmissionSendResult {
+ public final static int SENT_SUCCESSFULLY = 200;
+ public final static int PARTIAL_SUCCESS = 206;
+ public final static int REQUEST_TIMEOUT = 408;
+ public final static int THROTTLED = 429;
+ public final static int THROTTLED_OVER_EXTENDED_TIME = 439;
+ public final static int INTERNAL_SERVER_ERROR = 500;
+ public final static int SERVICE_UNAVAILABLE = 503;
}
diff --git a/core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/Constants.java b/core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/ConstantsTest.java
similarity index 98%
rename from core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/Constants.java
rename to core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/ConstantsTest.java
index 27ae7f7d3bb..421d682e0a8 100644
--- a/core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/Constants.java
+++ b/core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/ConstantsTest.java
@@ -24,6 +24,6 @@
/**
* Created by yonisha on 8/2/2015.
*/
-public class Constants {
+public class ConstantsTest {
public static final String CONTEXT_FILE_PATTERN = "Docker host=%s,Docker image=%s,Docker container name=%s,Docker container id=%s";
}
diff --git a/core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/DockerContextInitializerTests.java b/core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/DockerContextInitializerTests.java
index a5f788938b6..f11e7d8fbdb 100644
--- a/core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/DockerContextInitializerTests.java
+++ b/core/src/test/java/com/microsoft/applicationinsights/extensibility/initializer/docker/DockerContextInitializerTests.java
@@ -56,7 +56,7 @@ public class DockerContextInitializerTests {
@BeforeClass
public static void classInit() throws Exception {
- String json = String.format(com.microsoft.applicationinsights.extensibility.initializer.docker.Constants.CONTEXT_FILE_PATTERN, DEFAULT_HOST, DEFAULT_IMAGE, DEFAULT_CONTAINER_NAME, DEFAULT_CONTAINER_ID);
+ String json = String.format(com.microsoft.applicationinsights.extensibility.initializer.docker.ConstantsTest.CONTEXT_FILE_PATTERN, DEFAULT_HOST, DEFAULT_IMAGE, DEFAULT_CONTAINER_NAME, DEFAULT_CONTAINER_ID);
defaultDockerContext = new DockerContext(json);
initializerUnderTest = new DockerContextInitializer(fileFactoryMock, contextPollerMock);
}
diff --git a/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/ErrorHandlerTest.java b/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/ErrorHandlerTest.java
new file mode 100644
index 00000000000..aa74faa7acd
--- /dev/null
+++ b/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/ErrorHandlerTest.java
@@ -0,0 +1,107 @@
+package com.microsoft.applicationinsights.internal.channel.common;
+
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.microsoft.applicationinsights.internal.channel.TransmissionDispatcher;
+import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs;
+import com.microsoft.applicationinsights.internal.channel.common.ErrorHandler;
+import com.microsoft.applicationinsights.internal.channel.common.Transmission;
+import com.microsoft.applicationinsights.internal.channel.common.TransmissionPolicyManager;
+
+
+public class ErrorHandlerTest {
+
+
+ private boolean generateTransmissionWithStatusCode(int code) {
+ TransmissionPolicyManager tpm = new TransmissionPolicyManager(true);
+ TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class);
+ TransmissionHandlerArgs args = new TransmissionHandlerArgs();
+ args.setResponseCode(code);
+ args.setTransmission(new Transmission(new byte[] { 0 }, "testcontent", "testencoding"));
+ args.setTransmissionDispatcher(mockedDispatcher);
+ ErrorHandler eh = new ErrorHandler(tpm);
+ boolean result = eh.validateTransmissionAndSend(args);
+ return result;
+ }
+
+ @Test
+ public void failOnNull() {
+ TransmissionPolicyManager tpm = new TransmissionPolicyManager(true);
+ TransmissionHandlerArgs args = new TransmissionHandlerArgs();
+ ErrorHandler eh = new ErrorHandler(tpm);
+ boolean result = eh.validateTransmissionAndSend(args);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail200Status() {
+ boolean result = generateTransmissionWithStatusCode(200);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail206Status() {
+ boolean result = generateTransmissionWithStatusCode(206);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail400Status() {
+ boolean result = generateTransmissionWithStatusCode(400);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail404Status() {
+ boolean result = generateTransmissionWithStatusCode(404);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail429Status() {
+ boolean result = generateTransmissionWithStatusCode(429);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail439Status() {
+ boolean result = generateTransmissionWithStatusCode(439);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void pass408Status() {
+ boolean result = generateTransmissionWithStatusCode(408);
+ Assert.assertTrue(result);
+ }
+
+ @Test
+ public void pass500Status() {
+ boolean result = generateTransmissionWithStatusCode(500);
+ Assert.assertTrue(result);
+ }
+
+ @Test
+ public void pass503Status() {
+ boolean result = generateTransmissionWithStatusCode(503);
+ Assert.assertTrue(result);
+ }
+
+ @Test
+ public void passException() {
+ TransmissionPolicyManager tpm = new TransmissionPolicyManager(true);
+ TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class);
+ TransmissionHandlerArgs args = new TransmissionHandlerArgs();
+ args.setResponseCode(0);
+ args.setTransmission(null);
+ args.setTransmissionDispatcher(mockedDispatcher);
+ args.setException(new Exception("Mocked"));
+ ErrorHandler eh = new ErrorHandler(tpm);
+ boolean result = eh.validateTransmissionAndSend(args);
+ Assert.assertTrue(result);
+ }
+
+}
diff --git a/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/PartialSuccessHandlerTest.java b/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/PartialSuccessHandlerTest.java
new file mode 100644
index 00000000000..3731d41da46
--- /dev/null
+++ b/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/PartialSuccessHandlerTest.java
@@ -0,0 +1,266 @@
+package com.microsoft.applicationinsights.internal.channel.common;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.microsoft.applicationinsights.internal.channel.TransmissionDispatcher;
+import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs;
+import com.microsoft.applicationinsights.internal.channel.common.ErrorHandler;
+import com.microsoft.applicationinsights.internal.channel.common.PartialSuccessHandler;
+import com.microsoft.applicationinsights.internal.channel.common.Transmission;
+import com.microsoft.applicationinsights.internal.channel.common.TransmissionPolicyManager;
+
+public class PartialSuccessHandlerTest {
+
+ private final byte[] fourItems = new byte[] {31, -117, 8, 0, 0, 0, 0, 0, 0, 0, -19, -110, 77, 79, 2, 49, 16, -122, -17, 38, -2, 7, -45, -13, -74, 105, -69, -80, -127, -67, -111, -32, 1, 13, 23, 69, 60, 15, -69, 3, 84, 119, -37, 77, 91, 49, -124, -16, -33, 109, 23, 69, 49, -98, -12, 32, -121, -67, -50, -41, 59, -13, -50, -77, 35, 27, -76, 36, 23, 9, -47, 80, 35, -55, -55, 84, 21, -42, 56, -77, -12, 108, -44, 52, -107, 42, -64, 43, -93, 39, -38, -87, -43, -38, 59, -74, -56, -122, -112, 2, -49, 80, -10, -95, 39, -5, 11, 16, -48, -21, -31, 112, -71, -52, -106, 34, -19, 15, 36, -69, -34, -96, -10, 36, 33, 94, -75, -45, 36, 23, 3, -54, 37, 21, 98, 38, -78, -100, -53, 60, -51, -104, -112, -100, -14, 62, -25, -95, -54, 65, -35, 84, 120, 7, 62, -44, 10, -50, 25, 79, -120, -70, -59, 109, 104, -4, 16, -94, 81, -119, 70, 41, 26, -75, -24, 87, -79, 40, 3, 43, 71, -14, 29, 1, -59, -108, -10, 104, 53, 84, -52, -107, -49, 115, -76, 46, -84, 29, -26, 60, -63, 6, 114, -55, -62, 104, -70, 64, 15, -44, 105, 104, -36, -38, -60, 21, 67, 79, -119, 27, 85, 32, 83, 101, -88, -68, 25, 77, -57, -93, -7, -124, 78, 123, 3, -50, -87, 96, 22, -53, -38, -24, -110, 21, -58, 54, -84, 62, -70, 82, -104, -6, -92, -73, 50, 5, 84, -15, 84, -44, -12, -31, -2, -112, 58, -82, -94, 77, -119, -17, -66, -2, 114, -68, 9, -25, -111, 71, -91, 75, -13, -22, -82, 4, 63, -55, 89, 83, 97, -8, -116, 7, 93, -4, 73, -31, -45, -83, -17, 66, 14, 93, -52, 28, 12, -118, -65, -28, 82, 8, -111, 113, -103, -90, 100, -97, -112, 18, 60, 68, -9, 23, -32, 112, -74, 109, -30, 18, -19, -1, -57, 49, -98, -76, -31, -15, 123, 73, 75, -103, 60, 82, 54, 67, -25, -37, -46, 40, -44, 88, -45, -96, -11, 10, -61, -83, -6, -91, -86, -10, -5, -3, -27, -59, -18, 31, -64, 76, 69, 7, 102, 7, -26, 1, 76, -47, -127, -39, -127, 121, -114, 96, -54, -77, 2, 83, 118, 96, 118, 96, 30, -64, 76, 127, 6, -13, 13, -51, 46, -90, -77, 98, 10, 0, 0} ;
+ private final String fourItemsNonGZIP = "{\"ver\":1,\"name\":\"Microsoft.ApplicationInsights.b69a3a06e25a425ba1a44e9ff6f13582.Event\",\"time\":\"2018-02-11T16:02:36.120-0500\",\"sampleRate\":100.0,\"iKey\":\"b69a3a06-e25a-425b-a1a4-4e9ff6f13582\",\"tags\":{\"ai.internal.sdkVersion\":\"java:2.0.0-beta-snapshot\",\"ai.device.id\":\"test.machine.name\",\"ai.device.locale\":\"en-US\",\"ai.internal.nodename\":\"test.machine.name\",\"ai.device.os\":\"Windows 10\",\"ai.device.roleInstance\":\"test.machine.name\",\"ai.device.osVersion\":\"Windows 10\",\"ai.session.id\":\"20180211160233\"},\"data\":{\"baseType\":\"EventData\",\"baseData\":{\"ver\":2,\"name\":\"TestEvent0\",\"properties\":null}}}\r\n" +
+ "{\"ver\":1,\"name\":\"Microsoft.ApplicationInsights.b69a3a06e25a425ba1a44e9ff6f13582.Event\",\"time\":\"2018-02-11T16:02:36.131-0500\",\"sampleRate\":100.0,\"iKey\":\"b69a3a06-e25a-425b-a1a4-4e9ff6f13582\",\"tags\":{\"ai.internal.sdkVersion\":\"java:2.0.0-beta-snapshot\",\"ai.device.id\":\"test.machine.name\",\"ai.device.locale\":\"en-US\",\"ai.internal.nodename\":\"test.machine.name\",\"ai.device.os\":\"Windows 10\",\"ai.device.roleInstance\":\"test.machine.name\",\"ai.device.osVersion\":\"Windows 10\",\"ai.session.id\":\"20180211160233\"},\"data\":{\"baseType\":\"EventData\",\"baseData\":{\"ver\":2,\"name\":\"TestEvent1\",\"properties\":null}}}\r\n" +
+ "{\"ver\":1,\"name\":\"Microsoft.ApplicationInsights.b69a3a06e25a425ba1a44e9ff6f13582.Event\",\"time\":\"2018-02-11T16:02:36.131-0500\",\"sampleRate\":100.0,\"iKey\":\"b69a3a06-e25a-425b-a1a4-4e9ff6f13582\",\"tags\":{\"ai.internal.sdkVersion\":\"java:2.0.0-beta-snapshot\",\"ai.device.id\":\"test.machine.name\",\"ai.device.locale\":\"en-US\",\"ai.internal.nodename\":\"test.machine.name\",\"ai.device.os\":\"Windows 10\",\"ai.device.roleInstance\":\"test.machine.name\",\"ai.device.osVersion\":\"Windows 10\",\"ai.session.id\":\"20180211160233\"},\"data\":{\"baseType\":\"EventData\",\"baseData\":{\"ver\":2,\"name\":\"TestEvent2\",\"properties\":null}}}\r\n" +
+ "{\"ver\":1,\"name\":\"Microsoft.ApplicationInsights.b69a3a06e25a425ba1a44e9ff6f13582.Event\",\"time\":\"2018-02-11T16:02:36.132-0500\",\"sampleRate\":100.0,\"iKey\":\"b69a3a06-e25a-425b-a1a4-4e9ff6f13582\",\"tags\":{\"ai.internal.sdkVersion\":\"java:2.0.0-beta-snapshot\",\"ai.device.id\":\"test.machine.name\",\"ai.device.locale\":\"en-US\",\"ai.internal.nodename\":\"test.machine.name\",\"ai.device.os\":\"Windows 10\",\"ai.device.roleInstance\":\"test.machine.name\",\"ai.device.osVersion\":\"Windows 10\",\"ai.session.id\":\"20180211160233\"},\"data\":{\"baseType\":\"EventData\",\"baseData\":{\"ver\":2,\"name\":\"TestEvent3\",\"properties\":null}}}";
+
+ private boolean generateTransmissionWithStatusCode(int code) {
+ TransmissionPolicyManager tpm = new TransmissionPolicyManager(true);
+ TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class);
+ TransmissionHandlerArgs args = new TransmissionHandlerArgs();
+ args.setResponseCode(code);
+ args.setTransmission(new Transmission(new byte[] { 0 }, "testcontent", "testencoding"));
+ args.setTransmissionDispatcher(mockedDispatcher);
+ PartialSuccessHandler eh = new PartialSuccessHandler(tpm);
+ boolean result = eh.validateTransmissionAndSend(args);
+ return result;
+ }
+
+ private boolean generateTransmissionWithPartialResult(String responseBody) {
+ TransmissionPolicyManager tpm = new TransmissionPolicyManager(true);
+ TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class);
+ TransmissionHandlerArgs args = new TransmissionHandlerArgs();
+ args.setResponseCode(206);
+ args.setTransmission(new Transmission(fourItems, "application/x-json-stream", "gzip"));
+ args.setTransmissionDispatcher(mockedDispatcher);
+ args.setResponseBody(responseBody);
+ PartialSuccessHandler eh = new PartialSuccessHandler(tpm);
+ boolean result = eh.validateTransmissionAndSend(args);
+ return result;
+ }
+
+ @Test
+ public void failOnNull() {
+ TransmissionPolicyManager tpm = new TransmissionPolicyManager(true);
+ TransmissionHandlerArgs args = new TransmissionHandlerArgs();
+ ErrorHandler eh = new ErrorHandler(tpm);
+ boolean result = eh.validateTransmissionAndSend(args);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail200Status() {
+ boolean result = generateTransmissionWithStatusCode(200);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail400Status() {
+ boolean result = generateTransmissionWithStatusCode(400);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail404Status() {
+ boolean result = generateTransmissionWithStatusCode(404);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail408Status() {
+ boolean result = generateTransmissionWithStatusCode(408);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail500Status() {
+ boolean result = generateTransmissionWithStatusCode(500);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail503Status() {
+ boolean result = generateTransmissionWithStatusCode(503);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail429Status() {
+ boolean result = generateTransmissionWithStatusCode(429);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail439Status() {
+ boolean result = generateTransmissionWithStatusCode(439);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void failEmptyArrayList() {
+ TransmissionPolicyManager tpm = new TransmissionPolicyManager(true);
+ TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class);
+ TransmissionHandlerArgs args = new TransmissionHandlerArgs();
+ args.setResponseCode(206);
+ args.setTransmission(new Transmission(fourItems, "application/x-json-stream", "gzip"));
+ args.setTransmissionDispatcher(mockedDispatcher);
+ PartialSuccessHandler eh = new PartialSuccessHandler(tpm);
+ boolean result = eh.sendNewTransmission(args, new ArrayList());
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail206StatusSkipAcceptedRecieved() {
+ String validResult = "{\r\n" +
+ " \"itemsReceived\": 4,\r\n" +
+ " \"itemsAccepted\": 4,\r\n" +
+ " \"errors\": []\r\n }";
+ boolean result = generateTransmissionWithPartialResult(validResult);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail206StatusSkipRecievedLargerThanSent() {
+ String validResult = "{\r\n" +
+ " \"itemsReceived\": 10,\r\n" +
+ " \"itemsAccepted\": 9,\r\n" +
+ " \"errors\": [{\r\n" +
+ " \"index\": 0,\r\n" +
+ " \"statusCode\": 400,\r\n" +
+ " \"message\": \"109: Field 'startTime' on type 'RequestData' is required but missing or empty. Expected: string, Actual: undefined\"\r\n" +
+ " }]\r\n }";
+ boolean result = generateTransmissionWithPartialResult(validResult);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail206IndexOutOfRange() {
+ String validResult = "{\r\n" +
+ " \"itemsReceived\": 4,\r\n" +
+ " \"itemsAccepted\": 1,\r\n" +
+ " \"errors\": [\r\n" +
+ " {\r\n" +
+ " \"index\": 20,\r\n" +
+ " \"statusCode\": 400,\r\n" +
+ " \"message\": \"109: Field 'startTime' on type 'RequestData' is required but missing or empty. Expected: string, Actual: undefined\"\r\n" +
+ " },\r\n" +
+ " {\r\n" +
+ " \"index\": 12,\r\n" +
+ " \"statusCode\": 500,\r\n" +
+ " \"message\": \"Internal Server Error\"\r\n" +
+ " },\r\n" +
+ " {\r\n" +
+ " \"index\": 22,\r\n" +
+ " \"statusCode\": 439,\r\n" +
+ " \"message\": \"Too many requests\"\r\n" +
+ " }\r\n" +
+ " ]\r\n" +
+ "}";
+ boolean result = generateTransmissionWithPartialResult(validResult);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void pass206MixedIndexOutOfRange() {
+ String validResult = "{\r\n" +
+ " \"itemsReceived\": 4,\r\n" +
+ " \"itemsAccepted\": 1,\r\n" +
+ " \"errors\": [\r\n" +
+ " {\r\n" +
+ " \"index\": 5,\r\n" +
+ " \"statusCode\": 400,\r\n" +
+ " \"message\": \"109: Field 'startTime' on type 'RequestData' is required but missing or empty. Expected: string, Actual: undefined\"\r\n" +
+ " },\r\n" +
+ " {\r\n" +
+ " \"index\": 1,\r\n" +
+ " \"statusCode\": 500,\r\n" +
+ " \"message\": \"Internal Server Error\"\r\n" +
+ " },\r\n" +
+ " {\r\n" +
+ " \"index\": 22,\r\n" +
+ " \"statusCode\": 439,\r\n" +
+ " \"message\": \"Too many requests\"\r\n" +
+ " }\r\n" +
+ " ]\r\n" +
+ "}";
+ boolean result = generateTransmissionWithPartialResult(validResult);
+ Assert.assertTrue(result);
+ }
+
+ @Test
+ public void pass206Status() {
+ String validResult = "{\r\n" +
+ " \"itemsReceived\": 4,\r\n" +
+ " \"itemsAccepted\": 1,\r\n" +
+ " \"errors\": [\r\n" +
+ " {\r\n" +
+ " \"index\": 0,\r\n" +
+ " \"statusCode\": 400,\r\n" +
+ " \"message\": \"109: Field 'startTime' on type 'RequestData' is required but missing or empty. Expected: string, Actual: undefined\"\r\n" +
+ " },\r\n" +
+ " {\r\n" +
+ " \"index\": 1,\r\n" +
+ " \"statusCode\": 500,\r\n" +
+ " \"message\": \"Internal Server Error\"\r\n" +
+ " },\r\n" +
+ " {\r\n" +
+ " \"index\": 2,\r\n" +
+ " \"statusCode\": 439,\r\n" +
+ " \"message\": \"Too many requests\"\r\n" +
+ " }\r\n" +
+ " ]\r\n" +
+ "}";
+ boolean result = generateTransmissionWithPartialResult(validResult);
+ Assert.assertTrue(result);
+ }
+
+ @Test
+ public void passSingleItemArrayList() {
+ TransmissionPolicyManager tpm = new TransmissionPolicyManager(true);
+ TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class);
+ TransmissionHandlerArgs args = new TransmissionHandlerArgs();
+ args.setResponseCode(206);
+ args.setTransmission(new Transmission(fourItems, "application/x-json-stream", "gzip"));
+ args.setTransmissionDispatcher(mockedDispatcher);
+ PartialSuccessHandler eh = new PartialSuccessHandler(tpm);
+ List singleItem = new ArrayList();
+ singleItem.add("{\"ver\":1,\"name\":\"Microsoft.ApplicationInsights.b69a3a06e25a425ba1a44e9ff6f13582.Event\",\"time\":\"2018-02-11T16:02:36.120-0500\",\"sampleRate\":100.0,\"iKey\":\"b69a3a06-e25a-425b-a1a4-4e9ff6f13582\",\"tags\":{\"ai.internal.sdkVersion\":\"java:2.0.0-beta-snapshot\",\"ai.device.id\":\"test.machine.name\",\"ai.device.locale\":\"en-US\",\"ai.internal.nodename\":\"test.machine.name\",\"ai.device.os\":\"Windows 10\",\"ai.device.roleInstance\":\"test.machine.name\",\"ai.device.osVersion\":\"Windows 10\",\"ai.session.id\":\"20180211160233\"},\"data\":{\"baseType\":\"EventData\",\"baseData\":{\"ver\":2,\"name\":\"TestEvent0\",\"properties\":null}}}\r\n");
+ boolean result = eh.sendNewTransmission(args, singleItem);
+ Assert.assertTrue(result);
+ }
+
+ @Test
+ public void passGenerateOriginalItemsGZIP() {
+ TransmissionPolicyManager tpm = new TransmissionPolicyManager(true);
+ TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class);
+ TransmissionHandlerArgs args = new TransmissionHandlerArgs();
+ args.setResponseCode(206);
+ args.setTransmission(new Transmission(fourItems, "application/x-json-stream", "gzip"));
+ args.setTransmissionDispatcher(mockedDispatcher);
+ PartialSuccessHandler eh = new PartialSuccessHandler(tpm);
+ List originalItems = eh.generateOriginalItems(args);
+ Assert.assertEquals(4, originalItems.size());
+ }
+
+ @Test
+ public void passGenerateOriginalItemsNonGZIP() {
+ TransmissionPolicyManager tpm = new TransmissionPolicyManager(true);
+ TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class);
+ TransmissionHandlerArgs args = new TransmissionHandlerArgs();
+ args.setResponseCode(206);
+ args.setTransmission(new Transmission(fourItemsNonGZIP.getBytes(), "application/json", "utf8"));
+ args.setTransmissionDispatcher(mockedDispatcher);
+ PartialSuccessHandler eh = new PartialSuccessHandler(tpm);
+ List originalItems = eh.generateOriginalItems(args);
+ Assert.assertEquals(4, originalItems.size());
+ }
+
+}
diff --git a/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/ThrottlingHandlerTest.java b/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/ThrottlingHandlerTest.java
new file mode 100644
index 00000000000..05c1963ed26
--- /dev/null
+++ b/core/src/test/java/com/microsoft/applicationinsights/internal/channel/common/ThrottlingHandlerTest.java
@@ -0,0 +1,145 @@
+package com.microsoft.applicationinsights.internal.channel.common;
+
+import org.apache.http.message.BasicHeader;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.microsoft.applicationinsights.internal.channel.TransmissionDispatcher;
+import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs;
+import com.microsoft.applicationinsights.internal.channel.common.ErrorHandler;
+import com.microsoft.applicationinsights.internal.channel.common.ThrottlingHandler;
+import com.microsoft.applicationinsights.internal.channel.common.Transmission;
+import com.microsoft.applicationinsights.internal.channel.common.TransmissionPolicyManager;
+
+public class ThrottlingHandlerTest {
+
+ private final static String RESPONSE_THROTTLING_HEADER = "Retry-After";
+
+ private boolean generateTransmissionWithStatusCode(int code) {
+ TransmissionPolicyManager tpm = new TransmissionPolicyManager(true);
+ TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class);
+ TransmissionHandlerArgs args = new TransmissionHandlerArgs();
+ args.setResponseCode(code);
+ args.setTransmission(new Transmission(new byte[] { 0 }, "testcontent", "testencoding"));
+ args.setTransmissionDispatcher(mockedDispatcher);
+ ThrottlingHandler eh = new ThrottlingHandler(tpm);
+ boolean result = eh.validateTransmissionAndSend(args);
+ return result;
+ }
+
+ private boolean generateTransmissionWithStatusCodeAndHeader(int code, String retryHeader) {
+ TransmissionPolicyManager tpm = new TransmissionPolicyManager(true);
+ TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class);
+ TransmissionHandlerArgs args = new TransmissionHandlerArgs();
+ args.setResponseCode(code);
+ args.setTransmission(new Transmission(new byte[] { 0 }, "testcontent", "testencoding"));
+ args.setTransmissionDispatcher(mockedDispatcher);
+ args.setRetryHeader(new BasicHeader(RESPONSE_THROTTLING_HEADER, retryHeader));
+ ThrottlingHandler eh = new ThrottlingHandler(tpm);
+ boolean result = eh.validateTransmissionAndSend(args);
+ return result;
+ }
+
+ @Test
+ public void failOnNull() {
+ TransmissionPolicyManager tpm = new TransmissionPolicyManager(true);
+ TransmissionHandlerArgs args = new TransmissionHandlerArgs();
+ ErrorHandler eh = new ErrorHandler(tpm);
+ boolean result = eh.validateTransmissionAndSend(args);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail200Status() {
+ boolean result = generateTransmissionWithStatusCode(200);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail206Status() {
+ boolean result = generateTransmissionWithStatusCode(206);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail400Status() {
+ boolean result = generateTransmissionWithStatusCode(400);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail404Status() {
+ boolean result = generateTransmissionWithStatusCode(404);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail408Status() {
+ boolean result = generateTransmissionWithStatusCode(408);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void pass500Status() {
+ boolean result = generateTransmissionWithStatusCode(500);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail503Status() {
+ boolean result = generateTransmissionWithStatusCode(503);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void failException() {
+ TransmissionPolicyManager tpm = new TransmissionPolicyManager(true);
+ TransmissionDispatcher mockedDispatcher = Mockito.mock(TransmissionDispatcher.class);
+ TransmissionHandlerArgs args = new TransmissionHandlerArgs();
+ args.setResponseCode(0);
+ args.setTransmission(null);
+ args.setTransmissionDispatcher(mockedDispatcher);
+ args.setException(new Exception("Mocked"));
+ ThrottlingHandler eh = new ThrottlingHandler(tpm);
+ boolean result = eh.validateTransmissionAndSend(args);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail429StatusNoRetryHeader() {
+ boolean result = generateTransmissionWithStatusCode(429);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void fail439StatusNoRetryHeader() {
+ boolean result = generateTransmissionWithStatusCode(439);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void pass429StatusBadValue() {
+ boolean result = generateTransmissionWithStatusCodeAndHeader(429, "3600");
+ Assert.assertTrue(result);
+ }
+
+ @Test
+ public void pass429StatusGoodValue() {
+ boolean result = generateTransmissionWithStatusCodeAndHeader(429, "Sun, 11 Feb 2018 16:51:18 GMT");
+ Assert.assertTrue(result);
+ }
+
+ @Test
+ public void pass439StatusBadValue() {
+ boolean result = generateTransmissionWithStatusCodeAndHeader(439, "3600");
+ Assert.assertTrue(result);
+ }
+
+ @Test
+ public void pass439StatusGoodValue() {
+ boolean result = generateTransmissionWithStatusCodeAndHeader(439, "Sun, 11 Feb 2018 16:51:18 GMT");
+ Assert.assertTrue(result);
+ }
+
+}
diff --git a/core/src/test/java/com/microsoft/applicationinsights/internal/channel/inprocess/InProcessTelemetryChannelTest.java b/core/src/test/java/com/microsoft/applicationinsights/internal/channel/inprocess/InProcessTelemetryChannelTest.java
index 0619790d0ed..eed12e3e5e2 100644
--- a/core/src/test/java/com/microsoft/applicationinsights/internal/channel/inprocess/InProcessTelemetryChannelTest.java
+++ b/core/src/test/java/com/microsoft/applicationinsights/internal/channel/inprocess/InProcessTelemetryChannelTest.java
@@ -29,6 +29,8 @@
public class InProcessTelemetryChannelTest {
private final static String NON_VALID_URL = "http:sd{@~fsd.s.d.f;fffff";
+ private final static String INSTANT_RETRY_NAME = "MaxInstantRetry";
+ private final static int DEFAULT_MAX_INSTANT_RETRY = 3;
@Test(expected = IllegalArgumentException.class)
public void testNotValidEndpointAddressAsMapValue() {
@@ -36,4 +38,24 @@ public void testNotValidEndpointAddressAsMapValue() {
map.put("EndpointAddress", NON_VALID_URL);
new InProcessTelemetryChannel(map);
}
+ @Test()
+ public void testStringIntegerMaxInstanceRetry() {
+ HashMap map = new HashMap();
+ map.put(INSTANT_RETRY_NAME, "AABB");
+ new InProcessTelemetryChannel(map);
+ }
+
+ @Test()
+ public void testValidIntegerMaxInstanceRetry() {
+ HashMap map = new HashMap();
+ map.put(INSTANT_RETRY_NAME, "4");
+ new InProcessTelemetryChannel(map);
+ }
+
+ @Test()
+ public void testInvalidIntegerMaxInstanceRetry() {
+ HashMap map = new HashMap();
+ map.put(INSTANT_RETRY_NAME, "-1");
+ new InProcessTelemetryChannel(map);
+ }
}
\ No newline at end of file
diff --git a/test/performance/src/main/java/com/microsoft/applicationinsights/core/volume/ThroughputTestTransmitterFactory.java b/test/performance/src/main/java/com/microsoft/applicationinsights/core/volume/ThroughputTestTransmitterFactory.java
index be62d49b0a7..19aa8f1a6f1 100644
--- a/test/performance/src/main/java/com/microsoft/applicationinsights/core/volume/ThroughputTestTransmitterFactory.java
+++ b/test/performance/src/main/java/com/microsoft/applicationinsights/core/volume/ThroughputTestTransmitterFactory.java
@@ -34,11 +34,22 @@
final class ThroughputTestTransmitterFactory implements TransmitterFactory {
@Override
public TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled) {
+ return create(endpoint, maxTransmissionStorageCapacity, throttlingIsEnabled, 3);
+ }
+
+ @Override
+ public TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled, int maxInstanceRetries) {
// An active object with the network sender
TransmissionOutput actualNetworkSender = TestThreadLocalData.getTransmissionOutput();
final TransmissionPolicyManager transmissionPolicyManager = new TransmissionPolicyManager(throttlingIsEnabled);
+ transmissionPolicyManager.addTransmissionHandler(new ErrorHandler(transmissionPolicyManager));
+ transmissionPolicyManager.addTransmissionHandler(new PartialSuccessHandler(transmissionPolicyManager));
+ transmissionPolicyManager.addTransmissionHandler(new ThrottlingHandler(transmissionPolicyManager));
+ transmissionPolicyManager.setMaxInstantRetries(maxInstanceRetries);
+
TransmissionPolicyStateFetcher stateFetcher = transmissionPolicyManager.getTransmissionPolicyState();
TransmissionOutput networkSender = new ActiveTransmissionNetworkOutput(actualNetworkSender, stateFetcher);
+
// An active object with the file system sender
TransmissionFileSystemOutput fileSystemSender = new TransmissionFileSystemOutput();