Skip to content

Commit

Permalink
Merge pull request #189 from North-Two-Five/configurable-retry-count
Browse files Browse the repository at this point in the history
Configurable retry count
  • Loading branch information
lubird authored Jan 22, 2021
2 parents 9fb9c69 + 2cb8c32 commit 70fae37
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 21 deletions.
19 changes: 15 additions & 4 deletions analytics/src/main/java/com/segment/analytics/Analytics.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public static class Builder {
private ExecutorService networkExecutor;
private ThreadFactory threadFactory;
private int flushQueueSize;
private int maximumFlushAttempts;
private long flushIntervalInMillis;
private List<Callback> callbacks;

Expand Down Expand Up @@ -223,6 +224,15 @@ public Builder flushInterval(long flushInterval, TimeUnit unit) {
return this;
}

/** Set how many retries should happen before getting exhausted */
public Builder retries(int maximumRetries) {
if (maximumRetries < 1) {
throw new IllegalArgumentException("retries must be at least 1");
}
this.maximumFlushAttempts = maximumRetries;
return this;
}

/** Set the {@link ExecutorService} on which all HTTP requests will be made. */
public Builder networkExecutor(ExecutorService networkExecutor) {
if (networkExecutor == null) {
Expand Down Expand Up @@ -275,12 +285,12 @@ public Analytics build() {
.registerTypeAdapter(Date.class, new ISO8601DateAdapter()) //
.create();

if (endpoint == null && uploadURL == null) {
if (endpoint == null) {
endpoint = DEFAULT_ENDPOINT;
}

if (endpoint == null && uploadURL != null) {
endpoint = uploadURL;
if (uploadURL != null) {
endpoint = uploadURL;
}
}

if (client == null) {
Expand Down Expand Up @@ -349,6 +359,7 @@ public void log(String message) {
segmentService,
flushQueueSize,
flushIntervalInMillis,
maximumFlushAttempts,
log,
threadFactory,
networkExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class AnalyticsClient {
private final BlockingQueue<Message> messageQueue;
private final SegmentService service;
private final int size;
private final int maximumRetries;
private final Log log;
private final List<Callback> callbacks;
private final ExecutorService networkExecutor;
Expand All @@ -56,6 +57,7 @@ public static AnalyticsClient create(
SegmentService segmentService,
int flushQueueSize,
long flushIntervalInMillis,
int maximumRetries,
Log log,
ThreadFactory threadFactory,
ExecutorService networkExecutor,
Expand All @@ -65,6 +67,7 @@ public static AnalyticsClient create(
segmentService,
flushQueueSize,
flushIntervalInMillis,
maximumRetries,
log,
threadFactory,
networkExecutor,
Expand All @@ -77,6 +80,7 @@ public static AnalyticsClient create(
SegmentService service,
int maxQueueSize,
long flushIntervalInMillis,
int maximumRetries,
Log log,
ThreadFactory threadFactory,
ExecutorService networkExecutor,
Expand All @@ -85,6 +89,7 @@ public static AnalyticsClient create(
this.messageQueue = messageQueue;
this.service = service;
this.size = maxQueueSize;
this.maximumRetries = maximumRetries;
this.log = log;
this.callbacks = callbacks;
this.looperExecutor = Executors.newSingleThreadExecutor(threadFactory);
Expand Down Expand Up @@ -220,7 +225,8 @@ public void run() {
"Batching %s message(s) into batch %s.",
messages.size(),
batch.sequence());
networkExecutor.submit(BatchUploadTask.create(AnalyticsClient.this, batch));
networkExecutor.submit(
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
messages = new ArrayList<>();
}
}
Expand All @@ -244,15 +250,17 @@ static class BatchUploadTask implements Runnable {
private final AnalyticsClient client;
private final Backo backo;
final Batch batch;
private final int maxRetries;

static BatchUploadTask create(AnalyticsClient client, Batch batch) {
return new BatchUploadTask(client, BACKO, batch);
static BatchUploadTask create(AnalyticsClient client, Batch batch, int maxRetries) {
return new BatchUploadTask(client, BACKO, batch, maxRetries);
}

BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch) {
BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch, int maxRetries) {
this.client = client;
this.batch = batch;
this.backo = backo;
this.maxRetries = maxRetries;
}

private void notifyCallbacksWithException(Batch batch, Exception exception) {
Expand Down Expand Up @@ -314,7 +322,8 @@ boolean upload() {

@Override
public void run() {
for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
int attempt = 0;
for (; attempt <= maxRetries; attempt++) {
boolean retry = upload();
if (!retry) return;
try {
Expand All @@ -327,7 +336,8 @@ public void run() {
}

client.log.print(ERROR, "Could not upload batch %s. Retries exhausted.", batch.sequence());
notifyCallbacksWithException(batch, new IOException(MAX_ATTEMPTS + " retries exhausted"));
notifyCallbacksWithException(
batch, new IOException(Integer.toString(attempt) + " retries exhausted"));
}

private static boolean is5xx(int status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,23 @@ public void nullLog() {
}
}

@Test
public void negativeRetryCount() {
try {
builder.retries(0);
fail("Should fail for retries less than 1");
} catch (IllegalArgumentException e) {
assertThat(e).hasMessage("retries must be at least 1");
}

try {
builder.retries(-1);
fail("Should fail for retries less than 1");
} catch (IllegalArgumentException e) {
assertThat(e).hasMessage("retries must be at least 1");
}
}

@Test
public void nullTransformer() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ public class AnalyticsClientTest {
private static final Backo BACKO =
Backo.builder().base(TimeUnit.NANOSECONDS, 1).factor(1).build();

Log log = Log.NONE;
private int DEFAULT_RETRIES = 10;

Log log = Log.NONE;
ThreadFactory threadFactory;
BlockingQueue<Message> messageQueue;
@Mock BlockingQueue<Message> messageQueue;
@Mock SegmentService segmentService;
@Mock ExecutorService networkExecutor;
@Mock Callback callback;
Expand All @@ -87,6 +88,7 @@ AnalyticsClient newClient() {
segmentService,
50,
TimeUnit.HOURS.toMillis(1),
DEFAULT_RETRIES,
log,
threadFactory,
networkExecutor,
Expand Down Expand Up @@ -244,7 +246,7 @@ public void batchRetriesForNetworkErrors() {
.thenReturn(Calls.response(failureResponse))
.thenReturn(Calls.response(successResponse));

BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES);
batchUploadTask.run();

// Verify that we tried to upload 4 times, 3 failed and 1 succeeded.
Expand All @@ -269,7 +271,7 @@ public void batchRetriesForHTTP5xxErrors() {
.thenReturn(Calls.response(failResponse))
.thenReturn(Calls.response(successResponse));

BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES);
batchUploadTask.run();

// Verify that we tried to upload 4 times, 3 failed and 1 succeeded.
Expand All @@ -293,7 +295,7 @@ public void batchRetriesForHTTP429Errors() {
.thenReturn(Calls.response(failResponse))
.thenReturn(Calls.response(successResponse));

BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES);
batchUploadTask.run();

// Verify that we tried to upload 4 times, 3 failed and 1 succeeded.
Expand All @@ -312,7 +314,7 @@ public void batchDoesNotRetryForNon5xxAndNon429HTTPErrors() {
Response.error(404, ResponseBody.create(null, "Not Found"));
when(segmentService.upload(batch)).thenReturn(Calls.response(failResponse));

BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES);
batchUploadTask.run();

// Verify we only tried to upload once.
Expand All @@ -329,7 +331,7 @@ public void batchDoesNotRetryForNonNetworkErrors() {
Call<UploadResponse> networkFailure = Calls.failure(new RuntimeException());
when(segmentService.upload(batch)).thenReturn(networkFailure);

BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES);
batchUploadTask.run();

// Verify we only tried to upload once.
Expand All @@ -353,19 +355,20 @@ public Call<UploadResponse> answer(InvocationOnMock invocation) {
}
});

BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 10);
batchUploadTask.run();

// 50 == MAX_ATTEMPTS in AnalyticsClient.java
verify(segmentService, times(50)).upload(batch);
// DEFAULT_RETRIES == maxRetries
// tries 11(one normal run + 10 retries) even though default is 50 in AnalyticsClient.java
verify(segmentService, times(11)).upload(batch);
verify(callback)
.failure(
eq(trackMessage),
argThat(
new ArgumentMatcher<IOException>() {
@Override
public boolean matches(IOException exception) {
return exception.getMessage().equals("50 retries exhausted");
return exception.getMessage().equals("11 retries exhausted");
}
}));
}
Expand Down Expand Up @@ -455,4 +458,37 @@ public void shutdownWithMessagesInTheQueue(MessageBuilderTest builder)
verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS);
verify(networkExecutor).submit(any(AnalyticsClient.BatchUploadTask.class));
}

@Test
public void neverRetries() {
AnalyticsClient client = newClient();
TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build();
Batch batch = batchFor(trackMessage);

when(segmentService.upload(batch))
.thenAnswer(
new Answer<Call<UploadResponse>>() {
public Call<UploadResponse> answer(InvocationOnMock invocation) {
Response<UploadResponse> failResponse =
Response.error(429, ResponseBody.create(null, "Not Found"));
return Calls.response(failResponse);
}
});

BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 0);
batchUploadTask.run();

// runs once but never retries
verify(segmentService, times(1)).upload(batch);
verify(callback)
.failure(
eq(trackMessage),
argThat(
new ArgumentMatcher<IOException>() {
@Override
public boolean matches(IOException exception) {
return exception.getMessage().equals("1 retries exhausted");
}
}));
}
}

0 comments on commit 70fae37

Please sign in to comment.