Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing Retry logic [Reliable Channel] [STABLE Branch] #561

Merged
merged 18 commits into from
Feb 16, 2018
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dependencies {
compile ([group: 'org.apache.commons', name: 'commons-lang3', version: '3.7'])
compile ([group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.3'])
compile ([group: 'com.google.guava', name: 'guava', version: '20.0'])
compile ([group: 'com.google.code.gson', name: 'gson', version: '2.8.2'])
testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile group: 'org.mockito', name: 'mockito-all', version: '1.10.19'
testCompile group: 'com.google.code.gson', name: 'gson', version: '2.8.2'
Expand All @@ -56,6 +57,7 @@ shadowJar {
relocate 'org.apache.commons', 'com.microsoft.applicationinsights.core.dependencies.apachecommons'
relocate 'com.google.common', 'com.microsoft.applicationinsights.core.dependencies.googlecommon'
relocate 'javax.annotation', 'com.microsoft.applicationinsights.core.dependencies.javaxannotation'
relocate 'com.google.gson', 'com.microsoft.applicationinsights.core.dependencies.gson'
}

jar {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,18 @@
* Created by gupele on 1/15/2015.
*/
final class InProcessTelemetryChannelFactory implements TransmitterFactory {
private final int DEFAULT_RETRY = 3;
@Override
public TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled) {
return create(endpoint, maxTransmissionStorageCapacity, throttlingIsEnabled, DEFAULT_RETRY);
}
@Override
public TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled) {
final TransmissionPolicyManager transmissionPolicyManager = new TransmissionPolicyManager(throttlingIsEnabled);

public TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled, int maxInstantRetries) {
final TransmissionPolicyManager transmissionPolicyManager = new TransmissionPolicyManager(throttlingIsEnabled);
transmissionPolicyManager.addTransmissionHandler(new ErrorHandler(transmissionPolicyManager));
transmissionPolicyManager.addTransmissionHandler(new PartialSuccessHandler(transmissionPolicyManager));
transmissionPolicyManager.addTransmissionHandler(new ThrottlingHandler(transmissionPolicyManager));
transmissionPolicyManager.setMaxInstantRetries(maxInstantRetries);
// An active object with the network sender
TransmissionNetworkOutput actualNetworkSender = TransmissionNetworkOutput.create(endpoint, transmissionPolicyManager);

Expand All @@ -51,6 +59,7 @@ public TelemetriesTransmitter create(String endpoint, String maxTransmissionStor
// The dispatcher works with the two active senders
TransmissionDispatcher dispatcher = new NonBlockingDispatcher(new TransmissionOutput[] {networkSender, activeFileSystemOutput});
actualNetworkSender.setTransmissionDispatcher(dispatcher);


// The loader works with the file system loader as the active one does
TransmissionsLoader transmissionsLoader = new ActiveTransmissionLoader(fileSystemSender, stateFetcher, dispatcher);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.microsoft.applicationinsights.internal.channel;

/**
* An interface that is used to create a concrete class that is called by the the {@link TransmissionHandlerObserver}
* <p>
* This is used to implement classes like {@link ErrorHandler} and {@link PartialSuccessHandler}.
* @author jamdavi
*
*
*/
public interface TransmissionHandler {
/**
* Called when a transmission is sent by the {@link TransmissionOutput}.
* @param args The {@link TransmissionHandlerArgs} for this handler.
*/
void onTransmissionSent(TransmissionHandlerArgs args);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package com.microsoft.applicationinsights.internal.channel;

import org.apache.http.Header;

import com.microsoft.applicationinsights.internal.channel.common.Transmission;

/**
* This class is used to store information between the transmission sender and the transmission handlers
* <p>
* An example class that uses this are {@link ErrorHandler}
* @author jamdavi
*
*/
public class TransmissionHandlerArgs {
private String responseBody;
/**
* Set the response body.
* @param body The HTTP Response from the sender
*/
public void setResponseBody(String body) { this.responseBody = body;}
/**
* Get the response body
* @return The HTTP Response from the sender
*/
public String getResponseBody() { return this.responseBody;}


private TransmissionDispatcher transmissionDispatcher;
/**
* Set the {@link TransmissionDispatcher} used by the sender
* @param dispatcher The {@link TransmissionDispatcher} used by the sender
*/
public void setTransmissionDispatcher(TransmissionDispatcher dispatcher) { this.transmissionDispatcher = dispatcher;}
/**
* Get the {@link TransmissionDispatcher} used by the sender
* @return The {@link TransmissionDispatcher} used by the sender
*/
public TransmissionDispatcher getTransmissionDispatcher() { return this.transmissionDispatcher;}

private Transmission transmission;
/**
* Set the transmission that needs to be passed to the handler.
* @param transmission The transmission that needs to be passed to the handler.
*/
public void setTransmission(Transmission transmission) { this.transmission = transmission;}
/**
* Get the transmission that needs to be passed to the handler.
* @return The transmission used by the handler.
*/
public Transmission getTransmission() { return this.transmission;}

private int responseCode;
/**
* Set the response code to be passed to the handler.
* @param code The HTTP response code.
*/
public void setResponseCode(int code) { this.responseCode = code;}
/**
* Get the response code for the handler to use.
* @return The HTTP response code.
*/
public int getResponseCode() { return this.responseCode;}

private Throwable exception;
/**
* Set the exception thrown by the sender to be passed the handler.
* @param ex The exception
*/
public void setException(Throwable ex) { this.exception = ex;}
/**
* Get the exception thrown by the sender to be used by the handler.
* @return The exception
*/
public Throwable getException() { return this.exception;}

private Header retryHeader;
/**
* Set the Retry-After header to be passed to the handler.
* @param head The Retry-After header
*/
public void setRetryHeader(Header head) { this.retryHeader = head;}
/**
* Get the Retry-After header to be passed to the handler.
* @return The Retry-After header
*/
public Header getRetryHeader() { return this.retryHeader;}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.microsoft.applicationinsights.internal.channel;


/**
* Enables the {@link TransmissionPolicyManager} to handle transmission states.
* <p>
* This interface extends {@TransmissionHandler} to add the ability to observe when the transmission is completed.
* @author jamdavi
*
*/
public interface TransmissionHandlerObserver extends TransmissionHandler {

/**
* Used to add a {@link TransmissionHandler} to the collection stored by the {@link TransmissionPolicyManager}
* @param handler The handler to add to the collection.
*/
void addTransmissionHandler(TransmissionHandler handler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,21 @@
* Created by gupele on 12/21/2014.
*/
public interface TransmitterFactory {
/**
* Creates the {@link TelemetriesTransmitter} for use by the {@link TelemetryChannel}
* @param endpoint HTTP Endpoint to send telemetry to
* @param maxTransmissionStorageCapacity Max amount of disk space in KB for persistent storage to use
* @param throttlingIsEnabled Allow the network telemetry sender to be throttled
* @return The {@link TelemetriesTransmitter} object
*/
TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled);
/**
* Creates the {@link TelemetriesTransmitter} for use by the {@link TelemetryChannel}
* @param endpoint HTTP Endpoint to send telemetry to
* @param maxTransmissionStorageCapacity Max amount of disk space in KB for persistent storage to use
* @param throttlingIsEnabled Allow the network telemetry sender to be throttled
* @param maxInstantRetries Number of instant retries in case of a temporary network outage
* @return The {@link TelemetriesTransmitter} object
*/
TelemetriesTransmitter create(String endpoint, String maxTransmissionStorageCapacity, boolean throttlingIsEnabled, int maxInstantRetries);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.base.Preconditions;
import com.microsoft.applicationinsights.internal.channel.TransmissionDispatcher;
import com.microsoft.applicationinsights.internal.channel.TransmissionsLoader;
import com.microsoft.applicationinsights.internal.logger.InternalLogger;

import com.google.common.base.Preconditions;
import com.microsoft.applicationinsights.internal.shutdown.Stoppable;

/**
* The class is responsible for loading transmission files that were saved to the disk
*
Expand Down Expand Up @@ -108,7 +106,7 @@ public void run() {
case UNBLOCKED:
fetchNext(true);
break;

case BACKOFF:
case BLOCKED_BUT_CAN_BE_PERSISTED:
Thread.sleep(DEFAULT_SLEEP_INTERVAL_AFTER_DISPATCHING_IN_MILLS);
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.microsoft.applicationinsights.internal.channel.common;

/**
* Utility class used by the {@link PartialSuccessHandler}
*
* @author jamdavi
*
*/
class BackendResponse {

int itemsReceived;
int itemsAccepted;
Error[] errors;

class Error {
public int index;
public int statusCode;
public String message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.microsoft.applicationinsights.internal.channel.common;

import com.microsoft.applicationinsights.internal.channel.TransmissionHandler;
import com.microsoft.applicationinsights.internal.channel.TransmissionHandlerArgs;
import com.microsoft.applicationinsights.internal.logger.InternalLogger;

/**
* This class implements the retry logic for transmissions with the results of a
* 408, 500, and 503 result.
* <p>
* It does not handle any error codes such as 400, 401, 403, 404, etc.
*
* @author jamdavi
*
*/
public class ErrorHandler implements TransmissionHandler {

private TransmissionPolicyManager transmissionPolicyManager;

/**
* Ctor
*
* Constructs the ErrorHandler object.
*
* @param policy
* The {@link TransmissionPolicyManager} object that is needed to
* control the back off policy
*/
public ErrorHandler(TransmissionPolicyManager policy) {
this.transmissionPolicyManager = policy;
}

@Override
public void onTransmissionSent(TransmissionHandlerArgs args) {

validateTransmissionAndSend(args);
}

boolean validateTransmissionAndSend(TransmissionHandlerArgs args) {
if (args.getTransmission() != null && args.getTransmissionDispatcher() != null) {
args.getTransmission().incrementNumberOfSends();
switch (args.getResponseCode()) {
case TransmissionSendResult.REQUEST_TIMEOUT:
case TransmissionSendResult.INTERNAL_SERVER_ERROR:
case TransmissionSendResult.SERVICE_UNAVAILABLE:
backoffAndSendTransmission(args);
return true;
default:
InternalLogger.INSTANCE.trace("Http response code %s not handled by %s", args.getResponseCode(),
this.getClass().getName());
return false;
}
} else if (args.getException() != null) {
backoffAndSendTransmission(args);
return true;
}
return false;
}

private void backoffAndSendTransmission(TransmissionHandlerArgs args) {
// It is possible for us to have a temporary blip in transmission
// this setting will allow us to control how many instant retries we perform
// before backing off the send
if (args.getTransmission() != null && (args.getTransmission().getNumberOfSends() > transmissionPolicyManager.getMaxInstantRetries()))
{
this.transmissionPolicyManager.backoff();
}
args.getTransmissionDispatcher().dispatch(args.getTransmission());
}
}
Loading