From 1d7758a184dd86b3c27ebe6f1850e4c7fd30c437 Mon Sep 17 00:00:00 2001 From: Patrick Bassut Date: Thu, 7 Jan 2021 01:43:25 -0300 Subject: [PATCH 1/4] configurable retry count on analyticsClient --- .../java/com/segment/analytics/Analytics.java | 11 +++++++++ .../analytics/internal/AnalyticsClient.java | 19 ++++++++++----- .../internal/AnalyticsClientTest.java | 23 +++++++++++-------- 3 files changed, 37 insertions(+), 16 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/Analytics.java b/analytics/src/main/java/com/segment/analytics/Analytics.java index 51863377..8628f42d 100644 --- a/analytics/src/main/java/com/segment/analytics/Analytics.java +++ b/analytics/src/main/java/com/segment/analytics/Analytics.java @@ -109,6 +109,7 @@ public static class Builder { private ExecutorService networkExecutor; private ThreadFactory threadFactory; private int flushQueueSize; + private int maximumFlushAttempts; private long flushIntervalInMillis; private List callbacks; @@ -223,6 +224,15 @@ public Builder flushInterval(long flushInterval, TimeUnit unit) { return this; } + /** Set the interval at which the queue should be flushed. */ + public Builder retries(int maximumRetries) { + if (maximumFlushAttempts < 0) { + throw new IllegalArgumentException("retries must be greater than 0"); + } + this.maximumFlushAttempts = maximumRetries; + return this; + } + /** Set the {@link ExecutorService} on which all HTTP requests will be made. */ public Builder networkExecutor(ExecutorService networkExecutor) { if (networkExecutor == null) { @@ -349,6 +359,7 @@ public void log(String message) { segmentService, flushQueueSize, flushIntervalInMillis, + maximumFlushAttempts, log, threadFactory, networkExecutor, diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index cba72329..f65c1361 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -48,6 +48,7 @@ public class AnalyticsClient { private final BlockingQueue messageQueue; private final SegmentService service; private final int size; + private final int maximumRetries; private final Log log; private final List callbacks; private final ExecutorService networkExecutor; @@ -58,6 +59,7 @@ public static AnalyticsClient create( SegmentService segmentService, int flushQueueSize, long flushIntervalInMillis, + int maximumRetries, Log log, ThreadFactory threadFactory, ExecutorService networkExecutor, @@ -67,6 +69,7 @@ public static AnalyticsClient create( segmentService, flushQueueSize, flushIntervalInMillis, + maximumRetries, log, threadFactory, networkExecutor, @@ -78,6 +81,7 @@ public static AnalyticsClient create( SegmentService service, int maxQueueSize, long flushIntervalInMillis, + int maximumRetries, Log log, ThreadFactory threadFactory, ExecutorService networkExecutor, @@ -85,6 +89,7 @@ public static AnalyticsClient create( this.messageQueue = messageQueue; this.service = service; this.size = maxQueueSize; + this.maximumRetries = maximumRetries; this.log = log; this.callbacks = callbacks; this.looperExecutor = Executors.newSingleThreadExecutor(threadFactory); @@ -170,7 +175,7 @@ public void run() { "Batching %s message(s) into batch %s.", messages.size(), batch.sequence()); - networkExecutor.submit(BatchUploadTask.create(AnalyticsClient.this, batch)); + networkExecutor.submit(BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); messages = new ArrayList<>(); } } @@ -192,15 +197,17 @@ static class BatchUploadTask implements Runnable { private final AnalyticsClient client; private final Backo backo; final Batch batch; + private final int maximumFlushAttempts; - static BatchUploadTask create(AnalyticsClient client, Batch batch) { - return new BatchUploadTask(client, BACKO, batch); + static BatchUploadTask create(AnalyticsClient client, Batch batch, int maximumFlushAttempts) { + return new BatchUploadTask(client, BACKO, batch, maximumFlushAttempts); } - BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch) { + BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch, int maximumRetries) { this.client = client; this.batch = batch; this.backo = backo; + this.maximumFlushAttempts = maximumRetries; } private void notifyCallbacksWithException(Batch batch, Exception exception) { @@ -262,7 +269,7 @@ boolean upload() { @Override public void run() { - for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) { + for (int attempt = 0; attempt < maximumFlushAttempts; attempt++) { boolean retry = upload(); if (!retry) return; try { @@ -275,7 +282,7 @@ public void run() { } client.log.print(ERROR, "Could not upload batch %s. Retries exhausted.", batch.sequence()); - notifyCallbacksWithException(batch, new IOException(MAX_ATTEMPTS + " retries exhausted")); + notifyCallbacksWithException(batch, new IOException(maximumFlushAttempts + " retries exhausted")); } private static boolean is5xx(int status) { diff --git a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java index 9540fc9a..017e86df 100644 --- a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java +++ b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java @@ -52,6 +52,8 @@ public class AnalyticsClientTest { // Backo instance for testing which trims down the wait times. private static final Backo BACKO = Backo.builder().base(TimeUnit.NANOSECONDS, 1).factor(1).build(); + private int DEFAULT_RETRIES = 10; + Log log = Log.NONE; ThreadFactory threadFactory; @Mock @@ -74,7 +76,7 @@ public void setUp() { // Defers loading the client until tests can initialize all required // dependencies. AnalyticsClient newClient() { - return new AnalyticsClient(messageQueue, segmentService, 50, TimeUnit.HOURS.toMillis(1), log, threadFactory, + return new AnalyticsClient(messageQueue, segmentService, 50, TimeUnit.HOURS.toMillis(1), DEFAULT_RETRIES, log, threadFactory, networkExecutor, Collections.singletonList(callback)); } @@ -228,7 +230,7 @@ public void batchRetriesForNetworkErrors() { .thenReturn(Calls.response(failureResponse)).thenReturn(Calls.response(failureResponse)) .thenReturn(Calls.response(successResponse)); - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch); + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); batchUploadTask.run(); // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. @@ -250,7 +252,7 @@ public void batchRetriesForHTTP5xxErrors() { .thenReturn(Calls.response(failResponse)).thenReturn(Calls.response(failResponse)) .thenReturn(Calls.response(successResponse)); - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch); + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); batchUploadTask.run(); // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. @@ -271,7 +273,7 @@ public void batchRetriesForHTTP429Errors() { .thenReturn(Calls.response(failResponse)).thenReturn(Calls.response(failResponse)) .thenReturn(Calls.response(successResponse)); - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch); + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); batchUploadTask.run(); // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. @@ -289,7 +291,7 @@ public void batchDoesNotRetryForNon5xxAndNon429HTTPErrors() { Response failResponse = Response.error(404, ResponseBody.create(null, "Not Found")); when(segmentService.upload(batch)).thenReturn(Calls.response(failResponse)); - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch); + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); batchUploadTask.run(); // Verify we only tried to upload once. @@ -306,7 +308,7 @@ public void batchDoesNotRetryForNonNetworkErrors() { Call networkFailure = Calls.failure(new RuntimeException()); when(segmentService.upload(batch)).thenReturn(networkFailure); - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch); + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); batchUploadTask.run(); // Verify we only tried to upload once. @@ -327,15 +329,16 @@ public Call answer(InvocationOnMock invocation) { } }); - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch); + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 10); batchUploadTask.run(); - // 50 == MAX_ATTEMPTS in AnalyticsClient.java - verify(segmentService, times(50)).upload(batch); + // 10 == maximumFlushAttempts + // tries only 10 even though default is 50 in AnalyticsClient.java + verify(segmentService, times(10)).upload(batch); verify(callback).failure(eq(trackMessage), argThat(new ArgumentMatcher() { @Override public boolean matches(IOException exception) { - return exception.getMessage().equals("50 retries exhausted"); + return exception.getMessage().equals("10 retries exhausted"); } })); } From 6d996cc9a5347484245949a05ccd17b862e9b4ae Mon Sep 17 00:00:00 2001 From: Patrick Bassut Date: Thu, 7 Jan 2021 01:43:38 -0300 Subject: [PATCH 2/4] simpler logic --- .../src/main/java/com/segment/analytics/Analytics.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/Analytics.java b/analytics/src/main/java/com/segment/analytics/Analytics.java index 8628f42d..c87e0948 100644 --- a/analytics/src/main/java/com/segment/analytics/Analytics.java +++ b/analytics/src/main/java/com/segment/analytics/Analytics.java @@ -285,12 +285,12 @@ public Analytics build() { .registerTypeAdapter(Date.class, new ISO8601DateAdapter()) // .create(); - if (endpoint == null && uploadURL == null) { + if(endpoint == null) { endpoint = DEFAULT_ENDPOINT; - } - if (endpoint == null && uploadURL != null) { - endpoint = uploadURL; + if (uploadURL != null) { + endpoint = uploadURL; + } } if (client == null) { From 1f000df892f2b00c1f12da21ae9620d24d948af7 Mon Sep 17 00:00:00 2001 From: Patrick Bassut Date: Thu, 7 Jan 2021 02:46:24 -0300 Subject: [PATCH 3/4] tests --- .../java/com/segment/analytics/Analytics.java | 8 +- .../analytics/internal/AnalyticsClient.java | 20 +- .../analytics/AnalyticsBuilderTest.java | 17 + .../internal/AnalyticsClientTest.java | 640 ++++++++++-------- 4 files changed, 379 insertions(+), 306 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/Analytics.java b/analytics/src/main/java/com/segment/analytics/Analytics.java index c87e0948..13106d88 100644 --- a/analytics/src/main/java/com/segment/analytics/Analytics.java +++ b/analytics/src/main/java/com/segment/analytics/Analytics.java @@ -224,10 +224,10 @@ public Builder flushInterval(long flushInterval, TimeUnit unit) { return this; } - /** Set the interval at which the queue should be flushed. */ + /** Set how many retries should happen before getting exhausted */ public Builder retries(int maximumRetries) { - if (maximumFlushAttempts < 0) { - throw new IllegalArgumentException("retries must be greater than 0"); + if (maximumRetries < 1) { + throw new IllegalArgumentException("retries must be at least 1"); } this.maximumFlushAttempts = maximumRetries; return this; @@ -285,7 +285,7 @@ public Analytics build() { .registerTypeAdapter(Date.class, new ISO8601DateAdapter()) // .create(); - if(endpoint == null) { + if (endpoint == null) { endpoint = DEFAULT_ENDPOINT; if (uploadURL != null) { diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index f65c1361..0fa748fa 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -26,8 +26,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.lang.instrument.Instrumentation; - import retrofit2.Call; import retrofit2.Response; @@ -44,7 +42,6 @@ public class AnalyticsClient { CONTEXT = Collections.unmodifiableMap(context); } - private final BlockingQueue messageQueue; private final SegmentService service; private final int size; @@ -130,9 +127,11 @@ public int messageSizeInBytes(TrackMessage message) { } private Boolean isBackPressured() { - int messageQueueSize = messageQueue.stream() - .map(message -> messageSizeInBytes((TrackMessage) message)) - .reduce(0, (messageASize, messageBSize) -> messageASize + messageBSize); + int messageQueueSize = + messageQueue + .stream() + .map(message -> messageSizeInBytes((TrackMessage) message)) + .reduce(0, (messageASize, messageBSize) -> messageASize + messageBSize); return messageQueueSize >= MESSAGE_QUEUE_MAX_BYTE_SIZE; } @@ -175,7 +174,8 @@ public void run() { "Batching %s message(s) into batch %s.", messages.size(), batch.sequence()); - networkExecutor.submit(BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); + networkExecutor.submit( + BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); messages = new ArrayList<>(); } } @@ -269,7 +269,8 @@ boolean upload() { @Override public void run() { - for (int attempt = 0; attempt < maximumFlushAttempts; attempt++) { + int attempt = 0; + for (; attempt <= maximumFlushAttempts; attempt++) { boolean retry = upload(); if (!retry) return; try { @@ -282,7 +283,8 @@ public void run() { } client.log.print(ERROR, "Could not upload batch %s. Retries exhausted.", batch.sequence()); - notifyCallbacksWithException(batch, new IOException(maximumFlushAttempts + " retries exhausted")); + notifyCallbacksWithException( + batch, new IOException(Integer.toString(attempt) + " retries exhausted")); } private static boolean is5xx(int status) { diff --git a/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java b/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java index b8c6890c..23182d08 100644 --- a/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java +++ b/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java @@ -95,6 +95,23 @@ public void nullLog() { } } + @Test + public void negativeRetryCount() { + try { + builder.retries(0); + fail("Should fail for retries less than 1"); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessage("retries must be at least 1"); + } + + try { + builder.retries(-1); + fail("Should fail for retries less than 1"); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessage("retries must be at least 1"); + } + } + @Test public void nullTransformer() { try { diff --git a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java index 017e86df..2f0afe7e 100644 --- a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java +++ b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java @@ -49,297 +49,351 @@ @RunWith(BurstJUnit4.class) // public class AnalyticsClientTest { - // Backo instance for testing which trims down the wait times. - private static final Backo BACKO = Backo.builder().base(TimeUnit.NANOSECONDS, 1).factor(1).build(); - - private int DEFAULT_RETRIES = 10; - - Log log = Log.NONE; - ThreadFactory threadFactory; - @Mock - BlockingQueue messageQueue; - @Mock - SegmentService segmentService; - @Mock - ExecutorService networkExecutor; - @Mock - Callback callback; - @Mock - UploadResponse response; - - @Before - public void setUp() { - initMocks(this); - threadFactory = Executors.defaultThreadFactory(); - } - - // Defers loading the client until tests can initialize all required - // dependencies. - AnalyticsClient newClient() { - return new AnalyticsClient(messageQueue, segmentService, 50, TimeUnit.HOURS.toMillis(1), DEFAULT_RETRIES, log, threadFactory, - networkExecutor, Collections.singletonList(callback)); - } - - @Test - public void enqueueAddsToQueue(MessageBuilderTest builder) throws InterruptedException { - AnalyticsClient client = newClient(); - - Message message = builder.get().userId("prateek").build(); - client.enqueue(message); - - verify(messageQueue).put(message); - } - - @Test - public void shutdown() { - AnalyticsClient client = newClient(); - - client.shutdown(); - - verify(messageQueue).clear(); - verify(networkExecutor).shutdown(); - } - - @Test - public void flushInsertsPoison() throws InterruptedException { - AnalyticsClient client = newClient(); - - client.flush(); - - verify(messageQueue).put(FlushMessage.POISON); - } - - /** Wait until the queue is drained. */ - static void wait(Queue queue) { - // noinspection StatementWithEmptyBody - while (queue.size() > 0) { - } - } - - /** - * Verify that a {@link BatchUploadTask} was submitted to the executor, and - * return the {@link BatchUploadTask#batch} it was uploading.. - */ - static Batch captureBatch(ExecutorService executor) { - final ArgumentCaptor runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); - verify(executor, timeout(1000)).submit(runnableArgumentCaptor.capture()); - final BatchUploadTask task = (BatchUploadTask) runnableArgumentCaptor.getValue(); - return task.batch; - } - - @Test - public void flushSubmitsToExecutor() { - messageQueue = new LinkedBlockingQueue<>(); - AnalyticsClient client = newClient(); - - TrackMessage first = TrackMessage.builder("foo").userId("bar").build(); - TrackMessage second = TrackMessage.builder("qaz").userId("qux").build(); - client.enqueue(first); - client.enqueue(second); - client.flush(); - wait(messageQueue); - - assertThat(captureBatch(networkExecutor).batch()).containsExactly(first, second); - } - - @Test - public void enqueueMaxTriggersFlush() { - messageQueue = new LinkedBlockingQueue<>(); - AnalyticsClient client = newClient(); - - // Enqueuing 51 messages (> 50) should trigger flush. - for (int i = 0; i < 51; i++) { - client.enqueue(TrackMessage.builder("Event " + i).userId("bar").build()); - } - wait(messageQueue); - - // Verify that the executor saw the batch. - assertThat(captureBatch(networkExecutor).batch()).hasSize(50); - } - - private static String createDataSize(int msgSize) { - char[] chars = new char[msgSize]; - Arrays.fill(chars, 'a'); - - return new String(chars); - } - - @Test - public void calculatesMessageByteSize() { - AnalyticsClient client = newClient(); - Map properties = new HashMap(); - - properties.put("dummy-property", createDataSize(1024 * 33)); - - TrackMessage bigMessage = TrackMessage.builder("Big Event").userId("bar").properties(properties).build(); - client.enqueue(bigMessage); - - // can't test for exact size cause other attributes come in play - assertThat(client.messageSizeInBytes(bigMessage)).isGreaterThan(1024 * 33); - } - - @Test - public void dontEnqueueWhenReachesMaxSize() throws InterruptedException { - AnalyticsClient client = newClient(); - Map properties = new HashMap(); - - properties.put("dummy-property", createDataSize(1024 * 33)); - - TrackMessage bigMessage = TrackMessage.builder("Big Event").userId("bar").properties(properties).build(); - client.enqueue(bigMessage); - - // assertThat(client.messageSizeInBytes(bigMessage)).isEqualTo(30); - - Message tinyMessage = TrackMessage.builder("Tinny Event").userId("bar").build(); - client.enqueue(tinyMessage); - wait(messageQueue); - - verify(messageQueue, times(2)).put(any(Message.class)); - } - - @Test - public void enqueueBeforeMaxDoesNotTriggerFlush() { - messageQueue = new LinkedBlockingQueue<>(); - AnalyticsClient client = newClient(); - - // Enqueuing 5 messages (< 50) should not trigger flush. - for (int i = 0; i < 5; i++) { - client.enqueue(TrackMessage.builder("Event " + i).userId("bar").build()); - } - wait(messageQueue); - - // Verify that the executor didn't see anything. - verify(networkExecutor, never()).submit(any(Runnable.class)); - } - - static Batch batchFor(Message message) { - return Batch.create(Collections.emptyMap(), Collections.singletonList(message)); - } - - @Test - public void batchRetriesForNetworkErrors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - Response successResponse = Response.success(200, response); - Response failureResponse = Response.error(429, ResponseBody.create(null, "")); - - // Throw a network error 3 times. - when(segmentService.upload(batch)).thenReturn(Calls.response(failureResponse)) - .thenReturn(Calls.response(failureResponse)).thenReturn(Calls.response(failureResponse)) - .thenReturn(Calls.response(successResponse)); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. - verify(segmentService, times(4)).upload(batch); - verify(callback).success(trackMessage); - } - - @Test - public void batchRetriesForHTTP5xxErrors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - // Throw a HTTP error 3 times. - - Response successResponse = Response.success(200, response); - Response failResponse = Response.error(500, ResponseBody.create(null, "Server Error")); - when(segmentService.upload(batch)).thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(failResponse)).thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(successResponse)); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. - verify(segmentService, times(4)).upload(batch); - verify(callback).success(trackMessage); - } - - @Test - public void batchRetriesForHTTP429Errors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - // Throw a HTTP error 3 times. - Response successResponse = Response.success(200, response); - Response failResponse = Response.error(429, ResponseBody.create(null, "Rate Limited")); - when(segmentService.upload(batch)).thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(failResponse)).thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(successResponse)); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. - verify(segmentService, times(4)).upload(batch); - verify(callback).success(trackMessage); - } - - @Test - public void batchDoesNotRetryForNon5xxAndNon429HTTPErrors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - // Throw a HTTP error that should not be retried. - Response failResponse = Response.error(404, ResponseBody.create(null, "Not Found")); - when(segmentService.upload(batch)).thenReturn(Calls.response(failResponse)); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify we only tried to upload once. - verify(segmentService).upload(batch); - verify(callback).failure(eq(trackMessage), any(IOException.class)); - } - - @Test - public void batchDoesNotRetryForNonNetworkErrors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - Call networkFailure = Calls.failure(new RuntimeException()); - when(segmentService.upload(batch)).thenReturn(networkFailure); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify we only tried to upload once. - verify(segmentService).upload(batch); - verify(callback).failure(eq(trackMessage), any(RuntimeException.class)); - } - - @Test - public void givesUpAfterMaxRetries() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - when(segmentService.upload(batch)).thenAnswer(new Answer>() { - public Call answer(InvocationOnMock invocation) { - Response failResponse = Response.error(429, ResponseBody.create(null, "Not Found")); - return Calls.response(failResponse); - } - }); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 10); - batchUploadTask.run(); - - // 10 == maximumFlushAttempts - // tries only 10 even though default is 50 in AnalyticsClient.java - verify(segmentService, times(10)).upload(batch); - verify(callback).failure(eq(trackMessage), argThat(new ArgumentMatcher() { - @Override - public boolean matches(IOException exception) { - return exception.getMessage().equals("10 retries exhausted"); - } - })); - } + // Backo instance for testing which trims down the wait times. + private static final Backo BACKO = + Backo.builder().base(TimeUnit.NANOSECONDS, 1).factor(1).build(); + + private int DEFAULT_RETRIES = 10; + + Log log = Log.NONE; + ThreadFactory threadFactory; + @Mock BlockingQueue messageQueue; + @Mock SegmentService segmentService; + @Mock ExecutorService networkExecutor; + @Mock Callback callback; + @Mock UploadResponse response; + + @Before + public void setUp() { + initMocks(this); + threadFactory = Executors.defaultThreadFactory(); + } + + // Defers loading the client until tests can initialize all required + // dependencies. + AnalyticsClient newClient() { + return new AnalyticsClient( + messageQueue, + segmentService, + 50, + TimeUnit.HOURS.toMillis(1), + DEFAULT_RETRIES, + log, + threadFactory, + networkExecutor, + Collections.singletonList(callback)); + } + + @Test + public void enqueueAddsToQueue(MessageBuilderTest builder) throws InterruptedException { + AnalyticsClient client = newClient(); + + Message message = builder.get().userId("prateek").build(); + client.enqueue(message); + + verify(messageQueue).put(message); + } + + @Test + public void shutdown() { + AnalyticsClient client = newClient(); + + client.shutdown(); + + verify(messageQueue).clear(); + verify(networkExecutor).shutdown(); + } + + @Test + public void flushInsertsPoison() throws InterruptedException { + AnalyticsClient client = newClient(); + + client.flush(); + + verify(messageQueue).put(FlushMessage.POISON); + } + + /** Wait until the queue is drained. */ + static void wait(Queue queue) { + // noinspection StatementWithEmptyBody + while (queue.size() > 0) {} + } + + /** + * Verify that a {@link BatchUploadTask} was submitted to the executor, and return the {@link + * BatchUploadTask#batch} it was uploading.. + */ + static Batch captureBatch(ExecutorService executor) { + final ArgumentCaptor runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(executor, timeout(1000)).submit(runnableArgumentCaptor.capture()); + final BatchUploadTask task = (BatchUploadTask) runnableArgumentCaptor.getValue(); + return task.batch; + } + + @Test + public void flushSubmitsToExecutor() { + messageQueue = new LinkedBlockingQueue<>(); + AnalyticsClient client = newClient(); + + TrackMessage first = TrackMessage.builder("foo").userId("bar").build(); + TrackMessage second = TrackMessage.builder("qaz").userId("qux").build(); + client.enqueue(first); + client.enqueue(second); + client.flush(); + wait(messageQueue); + + assertThat(captureBatch(networkExecutor).batch()).containsExactly(first, second); + } + + @Test + public void enqueueMaxTriggersFlush() { + messageQueue = new LinkedBlockingQueue<>(); + AnalyticsClient client = newClient(); + + // Enqueuing 51 messages (> 50) should trigger flush. + for (int i = 0; i < 51; i++) { + client.enqueue(TrackMessage.builder("Event " + i).userId("bar").build()); + } + wait(messageQueue); + + // Verify that the executor saw the batch. + assertThat(captureBatch(networkExecutor).batch()).hasSize(50); + } + + private static String createDataSize(int msgSize) { + char[] chars = new char[msgSize]; + Arrays.fill(chars, 'a'); + + return new String(chars); + } + + @Test + public void calculatesMessageByteSize() { + AnalyticsClient client = newClient(); + Map properties = new HashMap(); + + properties.put("dummy-property", createDataSize(1024 * 33)); + + TrackMessage bigMessage = + TrackMessage.builder("Big Event").userId("bar").properties(properties).build(); + client.enqueue(bigMessage); + + // can't test for exact size cause other attributes come in play + assertThat(client.messageSizeInBytes(bigMessage)).isGreaterThan(1024 * 33); + } + + @Test + public void dontEnqueueWhenReachesMaxSize() throws InterruptedException { + AnalyticsClient client = newClient(); + Map properties = new HashMap(); + + properties.put("dummy-property", createDataSize(1024 * 33)); + + TrackMessage bigMessage = + TrackMessage.builder("Big Event").userId("bar").properties(properties).build(); + client.enqueue(bigMessage); + + // assertThat(client.messageSizeInBytes(bigMessage)).isEqualTo(30); + + Message tinyMessage = TrackMessage.builder("Tinny Event").userId("bar").build(); + client.enqueue(tinyMessage); + wait(messageQueue); + + verify(messageQueue, times(2)).put(any(Message.class)); + } + + @Test + public void enqueueBeforeMaxDoesNotTriggerFlush() { + messageQueue = new LinkedBlockingQueue<>(); + AnalyticsClient client = newClient(); + + // Enqueuing 5 messages (< 50) should not trigger flush. + for (int i = 0; i < 5; i++) { + client.enqueue(TrackMessage.builder("Event " + i).userId("bar").build()); + } + wait(messageQueue); + + // Verify that the executor didn't see anything. + verify(networkExecutor, never()).submit(any(Runnable.class)); + } + + static Batch batchFor(Message message) { + return Batch.create(Collections.emptyMap(), Collections.singletonList(message)); + } + + @Test + public void batchRetriesForNetworkErrors() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Response successResponse = Response.success(200, response); + Response failureResponse = Response.error(429, ResponseBody.create(null, "")); + + // Throw a network error 3 times. + when(segmentService.upload(batch)) + .thenReturn(Calls.response(failureResponse)) + .thenReturn(Calls.response(failureResponse)) + .thenReturn(Calls.response(failureResponse)) + .thenReturn(Calls.response(successResponse)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. + verify(segmentService, times(4)).upload(batch); + verify(callback).success(trackMessage); + } + + @Test + public void batchRetriesForHTTP5xxErrors() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + // Throw a HTTP error 3 times. + + Response successResponse = Response.success(200, response); + Response failResponse = + Response.error(500, ResponseBody.create(null, "Server Error")); + when(segmentService.upload(batch)) + .thenReturn(Calls.response(failResponse)) + .thenReturn(Calls.response(failResponse)) + .thenReturn(Calls.response(failResponse)) + .thenReturn(Calls.response(successResponse)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. + verify(segmentService, times(4)).upload(batch); + verify(callback).success(trackMessage); + } + + @Test + public void batchRetriesForHTTP429Errors() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + // Throw a HTTP error 3 times. + Response successResponse = Response.success(200, response); + Response failResponse = + Response.error(429, ResponseBody.create(null, "Rate Limited")); + when(segmentService.upload(batch)) + .thenReturn(Calls.response(failResponse)) + .thenReturn(Calls.response(failResponse)) + .thenReturn(Calls.response(failResponse)) + .thenReturn(Calls.response(successResponse)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. + verify(segmentService, times(4)).upload(batch); + verify(callback).success(trackMessage); + } + + @Test + public void batchDoesNotRetryForNon5xxAndNon429HTTPErrors() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + // Throw a HTTP error that should not be retried. + Response failResponse = + Response.error(404, ResponseBody.create(null, "Not Found")); + when(segmentService.upload(batch)).thenReturn(Calls.response(failResponse)); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + // Verify we only tried to upload once. + verify(segmentService).upload(batch); + verify(callback).failure(eq(trackMessage), any(IOException.class)); + } + + @Test + public void batchDoesNotRetryForNonNetworkErrors() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + Call networkFailure = Calls.failure(new RuntimeException()); + when(segmentService.upload(batch)).thenReturn(networkFailure); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); + batchUploadTask.run(); + + // Verify we only tried to upload once. + verify(segmentService).upload(batch); + verify(callback).failure(eq(trackMessage), any(RuntimeException.class)); + } + + @Test + public void givesUpAfterMaxRetries() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + when(segmentService.upload(batch)) + .thenAnswer( + new Answer>() { + public Call answer(InvocationOnMock invocation) { + Response failResponse = + Response.error(429, ResponseBody.create(null, "Not Found")); + return Calls.response(failResponse); + } + }); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 10); + batchUploadTask.run(); + + // 10 == maximumFlushAttempts + // tries 11(one normal run + 10 retries) even though default is 50 in AnalyticsClient.java + verify(segmentService, times(11)).upload(batch); + verify(callback) + .failure( + eq(trackMessage), + argThat( + new ArgumentMatcher() { + @Override + public boolean matches(IOException exception) { + return exception.getMessage().equals("11 retries exhausted"); + } + })); + } + + @Test + public void neverRetries() { + AnalyticsClient client = newClient(); + TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); + Batch batch = batchFor(trackMessage); + + when(segmentService.upload(batch)) + .thenAnswer( + new Answer>() { + public Call answer(InvocationOnMock invocation) { + Response failResponse = + Response.error(429, ResponseBody.create(null, "Not Found")); + return Calls.response(failResponse); + } + }); + + BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 0); + batchUploadTask.run(); + + // runs once but never retries + verify(segmentService, times(1)).upload(batch); + verify(callback) + .failure( + eq(trackMessage), + argThat( + new ArgumentMatcher() { + @Override + public boolean matches(IOException exception) { + return exception.getMessage().equals("1 retries exhausted"); + } + })); + } } From 2cb8c329285f7adaa7bd9ccc5c704796561041eb Mon Sep 17 00:00:00 2001 From: Patrick Bassut Date: Wed, 20 Jan 2021 18:40:54 -0300 Subject: [PATCH 4/4] renaming variable to maxRetries to keep consistent --- .../segment/analytics/internal/AnalyticsClient.java | 12 ++++++------ .../analytics/internal/AnalyticsClientTest.java | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index 7939811a..a4b85e12 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -250,17 +250,17 @@ static class BatchUploadTask implements Runnable { private final AnalyticsClient client; private final Backo backo; final Batch batch; - private final int maximumFlushAttempts; + private final int maxRetries; - static BatchUploadTask create(AnalyticsClient client, Batch batch, int maximumFlushAttempts) { - return new BatchUploadTask(client, BACKO, batch, maximumFlushAttempts); + static BatchUploadTask create(AnalyticsClient client, Batch batch, int maxRetries) { + return new BatchUploadTask(client, BACKO, batch, maxRetries); } - BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch, int maximumRetries) { + BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch, int maxRetries) { this.client = client; this.batch = batch; this.backo = backo; - this.maximumFlushAttempts = maximumRetries; + this.maxRetries = maxRetries; } private void notifyCallbacksWithException(Batch batch, Exception exception) { @@ -323,7 +323,7 @@ boolean upload() { @Override public void run() { int attempt = 0; - for (; attempt <= maximumFlushAttempts; attempt++) { + for (; attempt <= maxRetries; attempt++) { boolean retry = upload(); if (!retry) return; try { diff --git a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java index ec62f125..93053a48 100644 --- a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java +++ b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java @@ -358,7 +358,7 @@ public Call answer(InvocationOnMock invocation) { BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 10); batchUploadTask.run(); - // DEFAULT_RETRIES == maximumFlushAttempts + // DEFAULT_RETRIES == maxRetries // tries 11(one normal run + 10 retries) even though default is 50 in AnalyticsClient.java verify(segmentService, times(11)).upload(batch); verify(callback)