Skip to content

Commit

Permalink
Implementing Retry logic [Reliable Channel] [STABLE Branch] (#561)
Browse files Browse the repository at this point in the history
* Initial commit of retry and backoff logic fixes

* Fixing warnings on files I touched this round

* Fix the eclipse UI from screaming about the docker Contstants

* Fixed backoff logic to use existing method. Added more logging to the sender channel.

* Added the partial response handler, more logging

* Added gson to core. Fixed backoff manager to keep original functionality. Added extension to return the timeout values as expected before.

* Added unit tests.

* Fixing string typed ArrayList<> to List<> per Dhaval

* Missed one

* Making tests consistent.

* Added javadoc comments, simplified logic for a few methods

* Added exception logging per @dhaval24. Fixed formatting on touched files

* Updates per last round of commits

Moved the Handlers out of the concrete package to the common package to keep the same consistency.  Removed a couple of unessecary methods. Added docs.

* Latest fixes

* Add MaxInstantRetry

Added MaxInstantRetry configuration to allow for instantaneous retry on a failed transmission.

* Javadoc Updates

Javadoc and formatting updates

* NumberFormatException fix

Added null check

* JavaDocs for TPM
  • Loading branch information
debugthings authored and grlima committed Feb 16, 2018
1 parent 94c7af7 commit 1fad688
Show file tree
Hide file tree
Showing 26 changed files with 1,844 additions and 639 deletions.
2 changes: 2 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dependencies {
compile ([group: 'org.apache.commons', name: 'commons-lang3', version: '3.7'])
compile ([group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.3'])
compile ([group: 'com.google.guava', name: 'guava', version: '20.0'])
compile ([group: 'com.google.code.gson', name: 'gson', version: '2.8.2'])
testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile group: 'org.mockito', name: 'mockito-all', version: '1.10.19'
testCompile group: 'com.google.code.gson', name: 'gson', version: '2.8.2'
Expand All @@ -56,6 +57,7 @@ shadowJar {
relocate 'org.apache.commons', 'com.microsoft.applicationinsights.core.dependencies.apachecommons'
relocate 'com.google.common', 'com.microsoft.applicationinsights.core.dependencies.googlecommon'
relocate 'javax.annotation', 'com.microsoft.applicationinsights.core.dependencies.javaxannotation'
relocate 'com.google.gson', 'com.microsoft.applicationinsights.core.dependencies.gson'
}

jar {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,18 @@
* Created by gupele on 1/15/2015.
*/
final class InProcessTelemetryChannelFactory implements TransmitterFactory {
private final int DEFAULT_RETRY = 3;
@Override
public TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled) {
return create(endpoint, maxTransmissionStorageCapacity, throttlingIsEnabled, DEFAULT_RETRY);
}
@Override
public TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled) {
final TransmissionPolicyManager transmissionPolicyManager = new TransmissionPolicyManager(throttlingIsEnabled);

public TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled, int maxInstantRetries) {
final TransmissionPolicyManager transmissionPolicyManager = new TransmissionPolicyManager(throttlingIsEnabled);
transmissionPolicyManager.addTransmissionHandler(new ErrorHandler(transmissionPolicyManager));
transmissionPolicyManager.addTransmissionHandler(new PartialSuccessHandler(transmissionPolicyManager));
transmissionPolicyManager.addTransmissionHandler(new ThrottlingHandler(transmissionPolicyManager));
transmissionPolicyManager.setMaxInstantRetries(maxInstantRetries);
// An active object with the network sender
TransmissionNetworkOutput actualNetworkSender = TransmissionNetworkOutput.create(endpoint, transmissionPolicyManager);

Expand All @@ -51,6 +59,7 @@ public TelemetriesTransmitter create(String endpoint, String maxTransmissionStor
// The dispatcher works with the two active senders
TransmissionDispatcher dispatcher = new NonBlockingDispatcher(new TransmissionOutput[] {networkSender, activeFileSystemOutput});
actualNetworkSender.setTransmissionDispatcher(dispatcher);


// The loader works with the file system loader as the active one does
TransmissionsLoader transmissionsLoader = new ActiveTransmissionLoader(fileSystemSender, stateFetcher, dispatcher);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.microsoft.applicationinsights.internal.channel;

/**
* An interface that is used to create a concrete class that is called by the the {@link TransmissionHandlerObserver}
* <p>
* This is used to implement classes like {@link ErrorHandler} and {@link PartialSuccessHandler}.
* @author jamdavi
*
*
*/
public interface TransmissionHandler {
/**
* Called when a transmission is sent by the {@link TransmissionOutput}.
* @param args The {@link TransmissionHandlerArgs} for this handler.
*/
void onTransmissionSent(TransmissionHandlerArgs args);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package com.microsoft.applicationinsights.internal.channel;

import org.apache.http.Header;

import com.microsoft.applicationinsights.internal.channel.common.Transmission;

/**
* This class is used to store information between the transmission sender and the transmission handlers
* <p>
* An example class that uses this are {@link ErrorHandler}
* @author jamdavi
*
*/
public class TransmissionHandlerArgs {
private String responseBody;
/**
* Set the response body.
* @param body The HTTP Response from the sender
*/
public void setResponseBody(String body) { this.responseBody = body;}
/**
* Get the response body
* @return The HTTP Response from the sender
*/
public String getResponseBody() { return this.responseBody;}


private TransmissionDispatcher transmissionDispatcher;
/**
* Set the {@link TransmissionDispatcher} used by the sender
* @param dispatcher The {@link TransmissionDispatcher} used by the sender
*/
public void setTransmissionDispatcher(TransmissionDispatcher dispatcher) { this.transmissionDispatcher = dispatcher;}
/**
* Get the {@link TransmissionDispatcher} used by the sender
* @return The {@link TransmissionDispatcher} used by the sender
*/
public TransmissionDispatcher getTransmissionDispatcher() { return this.transmissionDispatcher;}

private Transmission transmission;
/**
* Set the transmission that needs to be passed to the handler.
* @param transmission The transmission that needs to be passed to the handler.
*/
public void setTransmission(Transmission transmission) { this.transmission = transmission;}
/**
* Get the transmission that needs to be passed to the handler.
* @return The transmission used by the handler.
*/
public Transmission getTransmission() { return this.transmission;}

private int responseCode;
/**
* Set the response code to be passed to the handler.
* @param code The HTTP response code.
*/
public void setResponseCode(int code) { this.responseCode = code;}
/**
* Get the response code for the handler to use.
* @return The HTTP response code.
*/
public int getResponseCode() { return this.responseCode;}

private Throwable exception;
/**
* Set the exception thrown by the sender to be passed the handler.
* @param ex The exception
*/
public void setException(Throwable ex) { this.exception = ex;}
/**
* Get the exception thrown by the sender to be used by the handler.
* @return The exception
*/
public Throwable getException() { return this.exception;}

private Header retryHeader;
/**
* Set the Retry-After header to be passed to the handler.
* @param head The Retry-After header
*/
public void setRetryHeader(Header head) { this.retryHeader = head;}
/**
* Get the Retry-After header to be passed to the handler.
* @return The Retry-After header
*/
public Header getRetryHeader() { return this.retryHeader;}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.microsoft.applicationinsights.internal.channel;


/**
* Enables the {@link TransmissionPolicyManager} to handle transmission states.
* <p>
* This interface extends {@TransmissionHandler} to add the ability to observe when the transmission is completed.
* @author jamdavi
*
*/
public interface TransmissionHandlerObserver extends TransmissionHandler {

/**
* Used to add a {@link TransmissionHandler} to the collection stored by the {@link TransmissionPolicyManager}
* @param handler The handler to add to the collection.
*/
void addTransmissionHandler(TransmissionHandler handler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,21 @@
* Created by gupele on 12/21/2014.
*/
public interface TransmitterFactory {
/**
* Creates the {@link TelemetriesTransmitter} for use by the {@link TelemetryChannel}
* @param endpoint HTTP Endpoint to send telemetry to
* @param maxTransmissionStorageCapacity Max amount of disk space in KB for persistent storage to use
* @param throttlingIsEnabled Allow the network telemetry sender to be throttled
* @return The {@link TelemetriesTransmitter} object
*/
TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled);
/**
* Creates the {@link TelemetriesTransmitter} for use by the {@link TelemetryChannel}
* @param endpoint HTTP Endpoint to send telemetry to
* @param maxTransmissionStorageCapacity Max amount of disk space in KB for persistent storage to use
* @param throttlingIsEnabled Allow the network telemetry sender to be throttled
* @param maxInstantRetries Number of instant retries in case of a temporary network outage
* @return The {@link TelemetriesTransmitter} object
*/
TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled, int maxInstantRetries);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.base.Preconditions;
import com.microsoft.applicationinsights.internal.channel.TransmissionDispatcher;
import com.microsoft.applicationinsights.internal.channel.TransmissionsLoader;
import com.microsoft.applicationinsights.internal.logger.InternalLogger;

import com.google.common.base.Preconditions;
import com.microsoft.applicationinsights.internal.shutdown.Stoppable;

/**
* The class is responsible for loading transmission files that were saved to the disk
*
Expand Down Expand Up @@ -108,7 +106,7 @@ public void run() {
case UNBLOCKED:
fetchNext(true);
break;

case BACKOFF:
case BLOCKED_BUT_CAN_BE_PERSISTED:
Thread.sleep(DEFAULT_SLEEP_INTERVAL_AFTER_DISPATCHING_IN_MILLS);
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.microsoft.applicationinsights.internal.channel.common;

/**
* Utility class used by the {@link PartialSuccessHandler}
*
* @author jamdavi
*
*/
class BackendResponse {

int itemsReceived;
int itemsAccepted;
Error[] errors;

class Error {
public int index;
public int statusCode;
public String message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.microsoft.applicationinsights.internal.channel.common;

import com.microsoft.applicationinsights.internal.channel.TransmissionHandler;
import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs;
import com.microsoft.applicationinsights.internal.logger.InternalLogger;

/**
* This class implements the retry logic for transmissions with the results of a
* 408, 500, and 503 result.
* <p>
* It does not handle any error codes such as 400, 401, 403, 404, etc.
*
* @author jamdavi
*
*/
public class ErrorHandler implements TransmissionHandler {

private TransmissionPolicyManager transmissionPolicyManager;

/**
* Ctor
*
* Constructs the ErrorHandler object.
*
* @param policy
* The {@link TransmissionPolicyManager} object that is needed to
* control the back off policy
*/
public ErrorHandler(TransmissionPolicyManager policy) {
this.transmissionPolicyManager = policy;
}

@Override
public void onTransmissionSent(TransmissionHandlerArgs args) {

validateTransmissionAndSend(args);
}

boolean validateTransmissionAndSend(TransmissionHandlerArgs args) {
if (args.getTransmission() != null && args.getTransmissionDispatcher() != null) {
args.getTransmission().incrementNumberOfSends();
switch (args.getResponseCode()) {
case TransmissionSendResult.REQUEST_TIMEOUT:
case TransmissionSendResult.INTERNAL_SERVER_ERROR:
case TransmissionSendResult.SERVICE_UNAVAILABLE:
backoffAndSendTransmission(args);
return true;
default:
InternalLogger.INSTANCE.trace("Http response code %s not handled by %s", args.getResponseCode(),
this.getClass().getName());
return false;
}
} else if (args.getException() != null) {
backoffAndSendTransmission(args);
return true;
}
return false;
}

private void backoffAndSendTransmission(TransmissionHandlerArgs args) {
// It is possible for us to have a temporary blip in transmission
// this setting will allow us to control how many instant retries we perform
// before backing off the send
if (args.getTransmission() != null && (args.getTransmission().getNumberOfSends() > transmissionPolicyManager.getMaxInstantRetries()))
{
this.transmissionPolicyManager.backoff();
}
args.getTransmissionDispatcher().dispatch(args.getTransmission());
}
}
Loading

0 comments on commit 1fad688

Please sign in to comment.