Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable retry count #189

Merged
merged 5 commits into from
Jan 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions analytics/src/main/java/com/segment/analytics/Analytics.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public static class Builder {
private ExecutorService networkExecutor;
private ThreadFactory threadFactory;
private int flushQueueSize;
private int maximumFlushAttempts;
pbassut marked this conversation as resolved.
Show resolved Hide resolved
private long flushIntervalInMillis;
private List<Callback> callbacks;

Expand Down Expand Up @@ -223,6 +224,15 @@ public Builder flushInterval(long flushInterval, TimeUnit unit) {
return this;
}

/** Set how many retries should happen before getting exhausted */
public Builder retries(int maximumRetries) {
if (maximumRetries < 1) {
throw new IllegalArgumentException("retries must be at least 1");
}
this.maximumFlushAttempts = maximumRetries;
return this;
}

/** Set the {@link ExecutorService} on which all HTTP requests will be made. */
public Builder networkExecutor(ExecutorService networkExecutor) {
if (networkExecutor == null) {
Expand Down Expand Up @@ -275,12 +285,12 @@ public Analytics build() {
.registerTypeAdapter(Date.class, new ISO8601DateAdapter()) //
.create();

if (endpoint == null && uploadURL == null) {
if (endpoint == null) {
endpoint = DEFAULT_ENDPOINT;
}

if (endpoint == null && uploadURL != null) {
endpoint = uploadURL;
if (uploadURL != null) {
endpoint = uploadURL;
}
}

if (client == null) {
Expand Down Expand Up @@ -349,6 +359,7 @@ public void log(String message) {
segmentService,
flushQueueSize,
flushIntervalInMillis,
maximumFlushAttempts,
log,
threadFactory,
networkExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class AnalyticsClient {
private final BlockingQueue<Message> messageQueue;
private final SegmentService service;
private final int size;
private final int maximumRetries;
private final Log log;
private final List<Callback> callbacks;
private final ExecutorService networkExecutor;
Expand All @@ -56,6 +57,7 @@ public static AnalyticsClient create(
SegmentService segmentService,
int flushQueueSize,
long flushIntervalInMillis,
int maximumRetries,
Log log,
ThreadFactory threadFactory,
ExecutorService networkExecutor,
Expand All @@ -65,6 +67,7 @@ public static AnalyticsClient create(
segmentService,
flushQueueSize,
flushIntervalInMillis,
maximumRetries,
log,
threadFactory,
networkExecutor,
Expand All @@ -77,6 +80,7 @@ public static AnalyticsClient create(
SegmentService service,
int maxQueueSize,
long flushIntervalInMillis,
int maximumRetries,
Log log,
ThreadFactory threadFactory,
ExecutorService networkExecutor,
Expand All @@ -85,6 +89,7 @@ public static AnalyticsClient create(
this.messageQueue = messageQueue;
this.service = service;
this.size = maxQueueSize;
this.maximumRetries = maximumRetries;
this.log = log;
this.callbacks = callbacks;
this.looperExecutor = Executors.newSingleThreadExecutor(threadFactory);
Expand Down Expand Up @@ -220,7 +225,8 @@ public void run() {
"Batching %s message(s) into batch %s.",
messages.size(),
batch.sequence());
networkExecutor.submit(BatchUploadTask.create(AnalyticsClient.this, batch));
networkExecutor.submit(
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
messages = new ArrayList<>();
}
}
Expand All @@ -244,15 +250,17 @@ static class BatchUploadTask implements Runnable {
private final AnalyticsClient client;
private final Backo backo;
final Batch batch;
private final int maxRetries;

static BatchUploadTask create(AnalyticsClient client, Batch batch) {
return new BatchUploadTask(client, BACKO, batch);
static BatchUploadTask create(AnalyticsClient client, Batch batch, int maxRetries) {
return new BatchUploadTask(client, BACKO, batch, maxRetries);
}

BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch) {
BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch, int maxRetries) {
this.client = client;
this.batch = batch;
this.backo = backo;
this.maxRetries = maxRetries;
}

private void notifyCallbacksWithException(Batch batch, Exception exception) {
Expand Down Expand Up @@ -314,7 +322,8 @@ boolean upload() {

@Override
public void run() {
for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
int attempt = 0;
for (; attempt <= maxRetries; attempt++) {
boolean retry = upload();
if (!retry) return;
try {
Expand All @@ -327,7 +336,8 @@ public void run() {
}

client.log.print(ERROR, "Could not upload batch %s. Retries exhausted.", batch.sequence());
notifyCallbacksWithException(batch, new IOException(MAX_ATTEMPTS + " retries exhausted"));
notifyCallbacksWithException(
batch, new IOException(Integer.toString(attempt) + " retries exhausted"));
}

private static boolean is5xx(int status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,23 @@ public void nullLog() {
}
}

@Test
public void negativeRetryCount() {
try {
builder.retries(0);
fail("Should fail for retries less than 1");
} catch (IllegalArgumentException e) {
assertThat(e).hasMessage("retries must be at least 1");
}

try {
builder.retries(-1);
fail("Should fail for retries less than 1");
} catch (IllegalArgumentException e) {
assertThat(e).hasMessage("retries must be at least 1");
}
}

@Test
public void nullTransformer() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ public class AnalyticsClientTest {
private static final Backo BACKO =
Backo.builder().base(TimeUnit.NANOSECONDS, 1).factor(1).build();

Log log = Log.NONE;
private int DEFAULT_RETRIES = 10;

Log log = Log.NONE;
ThreadFactory threadFactory;
BlockingQueue<Message> messageQueue;
@Mock BlockingQueue<Message> messageQueue;
@Mock SegmentService segmentService;
@Mock ExecutorService networkExecutor;
@Mock Callback callback;
Expand All @@ -87,6 +88,7 @@ AnalyticsClient newClient() {
segmentService,
50,
TimeUnit.HOURS.toMillis(1),
DEFAULT_RETRIES,
log,
threadFactory,
networkExecutor,
Expand Down Expand Up @@ -244,7 +246,7 @@ public void batchRetriesForNetworkErrors() {
.thenReturn(Calls.response(failureResponse))
.thenReturn(Calls.response(successResponse));

BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES);
batchUploadTask.run();

// Verify that we tried to upload 4 times, 3 failed and 1 succeeded.
Expand All @@ -269,7 +271,7 @@ public void batchRetriesForHTTP5xxErrors() {
.thenReturn(Calls.response(failResponse))
.thenReturn(Calls.response(successResponse));

BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES);
batchUploadTask.run();

// Verify that we tried to upload 4 times, 3 failed and 1 succeeded.
Expand All @@ -293,7 +295,7 @@ public void batchRetriesForHTTP429Errors() {
.thenReturn(Calls.response(failResponse))
.thenReturn(Calls.response(successResponse));

BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES);
batchUploadTask.run();

// Verify that we tried to upload 4 times, 3 failed and 1 succeeded.
Expand All @@ -312,7 +314,7 @@ public void batchDoesNotRetryForNon5xxAndNon429HTTPErrors() {
Response.error(404, ResponseBody.create(null, "Not Found"));
when(segmentService.upload(batch)).thenReturn(Calls.response(failResponse));

BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES);
batchUploadTask.run();

// Verify we only tried to upload once.
Expand All @@ -329,7 +331,7 @@ public void batchDoesNotRetryForNonNetworkErrors() {
Call<UploadResponse> networkFailure = Calls.failure(new RuntimeException());
when(segmentService.upload(batch)).thenReturn(networkFailure);

BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES);
batchUploadTask.run();

// Verify we only tried to upload once.
Expand All @@ -353,19 +355,20 @@ public Call<UploadResponse> answer(InvocationOnMock invocation) {
}
});

BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch);
BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 10);
batchUploadTask.run();

// 50 == MAX_ATTEMPTS in AnalyticsClient.java
verify(segmentService, times(50)).upload(batch);
// DEFAULT_RETRIES == maxRetries
// tries 11(one normal run + 10 retries) even though default is 50 in AnalyticsClient.java
verify(segmentService, times(11)).upload(batch);
verify(callback)
.failure(
eq(trackMessage),
argThat(
new ArgumentMatcher<IOException>() {
@Override
public boolean matches(IOException exception) {
return exception.getMessage().equals("50 retries exhausted");
return exception.getMessage().equals("11 retries exhausted");
}
}));
}
Expand Down Expand Up @@ -455,4 +458,37 @@ public void shutdownWithMessagesInTheQueue(MessageBuilderTest builder)
verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS);
verify(networkExecutor).submit(any(AnalyticsClient.BatchUploadTask.class));
}

@Test
public void neverRetries() {
AnalyticsClient client = newClient();
TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build();
Batch batch = batchFor(trackMessage);

when(segmentService.upload(batch))
.thenAnswer(
new Answer<Call<UploadResponse>>() {
public Call<UploadResponse> answer(InvocationOnMock invocation) {
Response<UploadResponse> failResponse =
Response.error(429, ResponseBody.create(null, "Not Found"));
return Calls.response(failResponse);
}
});

BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 0);
batchUploadTask.run();

// runs once but never retries
verify(segmentService, times(1)).upload(batch);
verify(callback)
.failure(
eq(trackMessage),
argThat(
new ArgumentMatcher<IOException>() {
@Override
public boolean matches(IOException exception) {
return exception.getMessage().equals("1 retries exhausted");
}
}));
}
}