Skip to content

Commit

Permalink
Merge pull request #365 from stafichuk/master
Browse files Browse the repository at this point in the history
added a shutdown method to GeoApiContext which stops RateLimitExecutorDelayThread #261
  • Loading branch information
domesticmouse authored Nov 7, 2017
2 parents 6fa1633 + 93a2388 commit d865b9d
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 7 deletions.
5 changes: 5 additions & 0 deletions src/main/java/com/google/maps/GaeRequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public <T, R extends ApiResponse<T>> PendingResult<T> handlePost(
req, client, clazz, fieldNamingPolicy, errorTimeout, maxRetries, exceptionsAllowedToRetry);
}

@Override
public void shutdown() {
//do nothing
}

/** Builder strategy for constructing {@code GaeRequestHandler}. */
public static class Builder implements GeoApiContext.RequestHandler.Builder {

Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/google/maps/GeoApiContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ <T, R extends ApiResponse<T>> PendingResult<T> handlePost(
Integer maxRetries,
ExceptionsAllowedToRetry exceptionsAllowedToRetry);

void shutdown();

/** Builder pattern for {@code GeoApiContext.RequestHandler}. */
interface Builder {

Expand All @@ -123,6 +125,10 @@ interface Builder {
}
}

public void shutdown() {
requestHandler.shutdown();
}

<T, R extends ApiResponse<T>> PendingResult<T> get(
ApiConfig config, Class<? extends R> clazz, Map<String, String> params) {
if (channel != null && !channel.isEmpty() && !params.containsKey("channel")) {
Expand Down
11 changes: 9 additions & 2 deletions src/main/java/com/google/maps/OkHttpRequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.maps.internal.RateLimitExecutorService;
import java.io.IOException;
import java.net.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import okhttp3.Authenticator;
import okhttp3.Credentials;
Expand All @@ -42,9 +43,11 @@
public class OkHttpRequestHandler implements GeoApiContext.RequestHandler {
private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
private final OkHttpClient client;
private final ExecutorService executorService;

/* package */ OkHttpRequestHandler(OkHttpClient client) {
/* package */ OkHttpRequestHandler(OkHttpClient client, ExecutorService executorService) {
this.client = client;
this.executorService = executorService;
}

@Override
Expand Down Expand Up @@ -87,6 +90,10 @@ public <T, R extends ApiResponse<T>> PendingResult<T> handlePost(
req, client, clazz, fieldNamingPolicy, errorTimeout, maxRetries, exceptionsAllowedToRetry);
}

public void shutdown() {
executorService.shutdown();
}

/** Builder strategy for constructing an {@code OkHTTPRequestHandler}. */
public static class Builder implements GeoApiContext.RequestHandler.Builder {
private final OkHttpClient.Builder builder;
Expand Down Expand Up @@ -149,7 +156,7 @@ public Request authenticate(Route route, Response response) throws IOException {
@Override
public RequestHandler build() {
OkHttpClient client = builder.build();
return new OkHttpRequestHandler(client);
return new OkHttpRequestHandler(client, rateLimitExecutorService);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ public class RateLimitExecutorService implements ExecutorService, Runnable {
private final RateLimiter rateLimiter =
RateLimiter.create(DEFAULT_QUERIES_PER_SECOND, 1, TimeUnit.SECONDS);

final Thread delayThread;

public RateLimitExecutorService() {
setQueriesPerSecond(DEFAULT_QUERIES_PER_SECOND);
Thread delayThread = new Thread(this);
delayThread = new Thread(this);
delayThread.setDaemon(true);
delayThread.setName("RateLimitExecutorDelayThread");
delayThread.start();
Expand All @@ -74,7 +76,9 @@ public void run() {
while (!delegate.isShutdown()) {
this.rateLimiter.acquire();
Runnable r = queue.take();
delegate.execute(r);
if (!delegate.isShutdown()) {
delegate.execute(r);
}
}
} catch (InterruptedException ie) {
LOG.info("Interrupted", ie);
Expand All @@ -97,16 +101,33 @@ public void execute(Runnable runnable) {
queue.add(runnable);
}

// Everything below here is straight delegation.

@Override
public void shutdown() {
delegate.shutdown();
//we need this to break out of queue.take()
execute(
new Runnable() {
@Override
public void run() {
//do nothing
}
});
}

// Everything below here is straight delegation.

@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
List<Runnable> tasks = delegate.shutdownNow();
//we need this to break out of queue.take()
execute(
new Runnable() {
@Override
public void run() {
//do nothing
}
});
return tasks;
}

@Override
Expand Down
19 changes: 19 additions & 0 deletions src/test/java/com/google/maps/GeoApiContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

package com.google.maps;

import static com.google.maps.TestUtils.findLastThreadByName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -291,4 +294,20 @@ public void testToggleIfExceptionIsAllowedToRetry() throws Exception {

fail("OverQueryLimitException was expected but not observed.");
}

@Test
public void testShutdown() throws InterruptedException {
GeoApiContext context = builder.build();
final Thread delayThread = findLastThreadByName("RateLimitExecutorDelayThread");
assertNotNull(
"Delay thread should be created in constructor of RateLimitExecutorService", delayThread);
assertTrue(
"Delay thread should start in constructor of RateLimitExecutorService",
delayThread.isAlive());
//this is needed to make sure that delay thread has reached queue.take()
delayThread.join(10);
context.shutdown();
delayThread.join(10);
assertFalse(delayThread.isAlive());
}
}
14 changes: 14 additions & 0 deletions src/test/java/com/google/maps/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,18 @@ public static String retrieveBody(String filename) {
return body;
}
}

public static Thread findLastThreadByName(String name) {
ThreadGroup currentThreadGroup = Thread.currentThread().getThreadGroup();
Thread[] threads = new Thread[1000];
currentThreadGroup.enumerate(threads);
Thread delayThread = null;
for (Thread thread : threads) {
if (thread == null) break;
if (thread.getName().equals(name)) {
delayThread = thread;
}
}
return delayThread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package com.google.maps.internal;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import com.google.maps.MediumTests;
Expand Down Expand Up @@ -90,4 +92,20 @@ private static int countTotalRequests(AbstractMap<?, Integer> hashMap) {
}
return counter;
}

@Test
public void testDelayThreadIsStoppedAfterShutdownIsCalled() throws InterruptedException {
RateLimitExecutorService service = new RateLimitExecutorService();
final Thread delayThread = service.delayThread;
assertNotNull(
"Delay thread should be created in constructor of RateLimitExecutorService", delayThread);
assertTrue(
"Delay thread should start in constructor of RateLimitExecutorService",
delayThread.isAlive());
//this is needed to make sure that delay thread has reached queue.take()
delayThread.join(10);
service.shutdown();
delayThread.join(10);
assertFalse(delayThread.isAlive());
}
}

0 comments on commit d865b9d

Please sign in to comment.