From e996bef334365088c2eaa128e970f7a811c25770 Mon Sep 17 00:00:00 2001 From: Dan Richelson Date: Mon, 10 Apr 2017 17:41:06 -0700 Subject: [PATCH] Add optional proxy authentication. Improved connection closing. --- build.gradle | 2 +- .../launchdarkly/eventsource/EventSource.java | 50 ++++++++++++++++--- 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/build.gradle b/build.gradle index 74cefb0..352616f 100644 --- a/build.gradle +++ b/build.gradle @@ -12,7 +12,7 @@ repositories { allprojects { group = 'com.launchdarkly' - version = "1.1.1" + version = "1.2.0-SNAPSHOT" sourceCompatibility = 1.7 targetCompatibility = 1.7 } diff --git a/src/main/java/com/launchdarkly/eventsource/EventSource.java b/src/main/java/com/launchdarkly/eventsource/EventSource.java index d3607a9..df66498 100644 --- a/src/main/java/com/launchdarkly/eventsource/EventSource.java +++ b/src/main/java/com/launchdarkly/eventsource/EventSource.java @@ -41,6 +41,7 @@ public class EventSource implements ConnectionHandler, Closeable { private final OkHttpClient client; private volatile Call call; private final Random jitter = new Random(); + private BufferedSource bufferedSource = null; EventSource(Builder builder) { this.uri = builder.uri; @@ -52,14 +53,20 @@ public class EventSource implements ConnectionHandler, Closeable { this.executor = Executors.newCachedThreadPool(threadFactory); this.handler = new AsyncEventHandler(this.executor, builder.handler); this.readyState = new AtomicReference<>(RAW); - this.client = builder.client.newBuilder() + + OkHttpClient.Builder clientBuilder = builder.client.newBuilder() .connectionPool(new ConnectionPool(1, 1, TimeUnit.SECONDS)) .readTimeout(0, TimeUnit.SECONDS) .writeTimeout(0, TimeUnit.SECONDS) .connectTimeout(0, TimeUnit.SECONDS) .retryOnConnectionFailure(true) - .proxy(builder.proxy) - .build(); + .proxy(builder.proxy); + + if (builder.proxyAuthenticator != null) { + clientBuilder.proxyAuthenticator(builder.proxyAuthenticator); + } + + this.client = clientBuilder.build(); } public void start() { @@ -95,6 +102,9 @@ public void close() throws IOException { } } executor.shutdownNow(); + if (bufferedSource != null) { + bufferedSource.close(); + } if (client != null) { if (client.connectionPool() != null) { @@ -143,9 +153,12 @@ private void connect() { } catch (Exception e) { handler.onError(e); } - BufferedSource bs = Okio.buffer(response.body().source()); + if (bufferedSource != null) { + bufferedSource.close(); + } + bufferedSource = Okio.buffer(response.body().source()); EventParser parser = new EventParser(uri, handler, EventSource.this); - for (String line; !Thread.currentThread().isInterrupted() && (line = bs.readUtf8LineStrict()) != null; ) { + for (String line; !Thread.currentThread().isInterrupted() && (line = bufferedSource.readUtf8LineStrict()) != null; ) { parser.line(line); } } else { @@ -255,6 +268,7 @@ public static final class Builder { private final EventHandler handler; private Headers headers = Headers.of(); private Proxy proxy; + private Authenticator proxyAuthenticator = null; private OkHttpClient client = new OkHttpClient(); public Builder(EventHandler handler, URI uri) { @@ -304,8 +318,30 @@ public Builder client(OkHttpClient client) { * @return the builder */ public Builder proxy(String proxyHost, int proxyPort) { - proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)); - return this; + proxy = new Proxy(Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)); + return this; + } + + /** + * Set the {@link Proxy} to be used to make the EventSource connection. + * + * @param proxy the proxy + * @return the builder + */ + public Builder proxy(Proxy proxy) { + this.proxy = proxy; + return this; + } + + /** + * Sets the Proxy Authentication mechanism if needed. Defaults to no auth. + * + * @param proxyAuthenticator + * @return + */ + public Builder proxyAuthenticator(Authenticator proxyAuthenticator) { + this.proxyAuthenticator = proxyAuthenticator; + return this; } public EventSource build() {