diff --git a/README.md b/README.md index d6570b3..937f9d1 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,9 @@ CircleCI: ![CircleCI Status](https://circleci.com/gh/justinsb/jetcd.png?circle-t A simple Java client library for the awesome [etcd] -Uses the Apache [HttpAsyncClient] to implement watches without blocking a thread, and Google's [Guava] to give us the nice [ListenableFuture] interface. +This fork of jetcd uses [Netty HTTP Client](https://github.com/timboudreau/netty-http-client) to do asynchronous HTTP +requests with minimal resource utilization (the [original](https://github.com/justinsb/jetcd) uses [HttpAsyncClient] +to do the same - the Netty client is lighter-weight, but has more dependencies). Check out [SmokeTest.java] to see how this is used (and tested), but here's a quick code example: diff --git a/pom.xml b/pom.xml index 4c6f8c7..6136d97 100644 --- a/pom.xml +++ b/pom.xml @@ -1,95 +1,137 @@ - - 4.0.0 - com.justinsb - etcd-client - 0.1-SNAPSHOT - - - ${project.build.directory} - - - - - org.slf4j - slf4j-api - 1.7.5 - - - org.apache.httpcomponents - httpasyncclient - 4.0-beta4 - - - com.google.guava - guava - 14.0.1 - - - com.google.code.gson - gson - 2.2.4 - - - junit - junit - 4.11 - test - - - org.slf4j - slf4j-simple - 1.7.5 - test - - - - - - - maven-assembly-plugin - 2.4 - - 1.5 - - jar-with-dependencies - - - - - make-assembly - package - - single - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 2.3.2 - - 1.6 - 1.6 - UTF-8 - - - - - - - - - org.codehaus.mojo - findbugs-maven-plugin - 2.5.2 - - true - ${artifactTargetPath} - ${artifactTargetPath} - - - - + + + + + 4.0.0 + + + com.mastfrog + 2.9.7 + mastfrog-parent + + + + com.justinsb + etcd-client + + https://github.com/timboudreau/jetcd + 0.3-tboudreau-fork-1 + + + ${project.build.directory} + true + + + + + com.google.code.gson + gson + + + junit + junit + test + + + com.mastfrog + netty-http-client + ${mastfrog.version} + + + + + + + + + maven-assembly-plugin + 3.1.1 + + 1.5 + + jar-with-dependencies + + + + + make-assembly + + + + package + + + + + single + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.0 + + 1.7 + 1.7 + UTF-8 + true + + + + + + + + + org.codehaus.mojo + findbugs-maven-plugin + 3.0.5 + + true + ${artifactTargetPath} + ${artifactTargetPath} + + + + + + + Github + https://github.com/timboudreau/jetcd/issues + + + + https://github.com/timboudreau/jetcd.git + scm:git:https://github.com/timboudreau/jetcd.git + git@github.com/timboudreau/jetcd.git + + + + Mastfrog Technologies + https://mastfrog.com + + + + + MIT + https://opensource.org/licenses/MIT + repo + + + + + + Tim Boudreau + tim@timboudreau.com + https://timboudreau.com + + + diff --git a/src/main/java/com/justinsb/etcd/EtcdClient.java b/src/main/java/com/justinsb/etcd/EtcdClient.java index babc363..0c90fb7 100644 --- a/src/main/java/com/justinsb/etcd/EtcdClient.java +++ b/src/main/java/com/justinsb/etcd/EtcdClient.java @@ -1,29 +1,5 @@ package com.justinsb.etcd; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.URI; -import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; - -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.StatusLine; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.entity.UrlEncodedFormEntity; -import org.apache.http.client.methods.HttpDelete; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPut; -import org.apache.http.client.methods.HttpUriRequest; -import org.apache.http.concurrent.FutureCallback; -import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; -import org.apache.http.impl.nio.client.HttpAsyncClients; -import org.apache.http.message.BasicNameValuePair; -import org.apache.http.util.EntityUtils; - -import com.google.common.base.Charsets; import com.google.common.base.Splitter; import com.google.common.collect.Lists; import com.google.common.util.concurrent.AsyncFunction; @@ -35,22 +11,47 @@ import com.google.gson.JsonArray; import com.google.gson.JsonParseException; import com.google.gson.JsonParser; +import com.mastfrog.acteur.headers.Method; +import com.mastfrog.mime.MimeType; +import com.mastfrog.netty.http.client.HttpClient; +import com.mastfrog.netty.http.client.HttpRequestBuilder; +import com.mastfrog.netty.http.client.ResponseFuture; +import com.mastfrog.netty.http.client.ResponseHandler; +import com.mastfrog.url.Parameters; +import com.mastfrog.url.ParametersElement; +import com.mastfrog.util.builder.AbstractBuilder; +import com.mastfrog.util.preconditions.Exceptions; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponseStatus; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; public class EtcdClient { - static final CloseableHttpAsyncClient httpClient = buildDefaultHttpClient(); - static final Gson gson = new GsonBuilder().create(); - static CloseableHttpAsyncClient buildDefaultHttpClient() { - // TODO: Increase timeout?? - RequestConfig requestConfig = RequestConfig.custom().build(); - CloseableHttpAsyncClient httpClient = HttpAsyncClients.custom().setDefaultRequestConfig(requestConfig).build(); - httpClient.start(); - return httpClient; - } + static final Gson gson = new GsonBuilder().create(); final URI baseUri; + private final HttpClient httpClient; public EtcdClient(URI baseUri) { + this(baseUri, HttpClient.builder() + .maxChunkSize(512) + .noCompression() + .maxInitialLineLength(255) + .followRedirects() + .threadCount(1) + .dontSend100Continue() + .build()); + } + + public EtcdClient(URI baseUri, HttpClient client) { + this.httpClient = client; String uri = baseUri.toString(); if (!uri.endsWith("/")) { uri += "/"; @@ -64,9 +65,9 @@ public EtcdClient(URI baseUri) { */ public EtcdResult get(String key) throws EtcdClientException { URI uri = buildKeyUri("v2/keys", key, ""); - HttpGet request = new HttpGet(uri); + UriRequest request = new UriRequest(Method.GET, uri); - EtcdResult result = syncExecute(request, new int[] { 200, 404 }, 100); + EtcdResult result = syncExecute(request, new int[]{200, 404}, 100); if (result.isError()) { if (result.errorCode == 100) { return null; @@ -80,9 +81,9 @@ public EtcdResult get(String key) throws EtcdClientException { */ public EtcdResult delete(String key) throws EtcdClientException { URI uri = buildKeyUri("v2/keys", key, ""); - HttpDelete request = new HttpDelete(uri); + UriRequest request = new UriRequest(Method.DELETE, uri); - return syncExecute(request, new int[] { 200, 404 }); + return syncExecute(request, new int[]{200, 404}); } /** @@ -95,7 +96,6 @@ public EtcdResult set(String key, String value) throws EtcdClientException { /** * Sets a key to a new value with an (optional) ttl */ - public EtcdResult set(String key, String value, Integer ttl) throws EtcdClientException { List data = Lists.newArrayList(); data.add(new BasicNameValuePair("value", value)); @@ -103,7 +103,7 @@ public EtcdResult set(String key, String value, Integer ttl) throws EtcdClientEx data.add(new BasicNameValuePair("ttl", Integer.toString(ttl))); } - return set0(key, data, new int[] { 200, 201 }); + return set0(key, data, new int[]{200, 201}); } /** @@ -112,26 +112,27 @@ public EtcdResult set(String key, String value, Integer ttl) throws EtcdClientEx public EtcdResult createDirectory(String key) throws EtcdClientException { List data = Lists.newArrayList(); data.add(new BasicNameValuePair("dir", "true")); - return set0(key, data, new int[] { 200, 201 }); + return set0(key, data, new int[]{200, 201}); } - + /** * Lists a directory */ public List listDirectory(String key) throws EtcdClientException { - EtcdResult result = get(key + "/"); - if (result == null || result.node == null) { - return null; - } - return result.node.nodes; + EtcdResult result = get(key + "/"); + if (result == null || result.node == null) { + return null; + } + return result.node.nodes; } + /** * Delete a directory */ public EtcdResult deleteDirectory(String key) throws EtcdClientException { URI uri = buildKeyUri("v2/keys", key, "?dir=true"); - HttpDelete request = new HttpDelete(uri); - return syncExecute(request, new int[] { 202 }); + UriRequest request = new UriRequest(Method.DELETE, uri); + return syncExecute(request, new int[]{202}); } /** @@ -142,7 +143,7 @@ public EtcdResult cas(String key, String prevValue, String value) throws EtcdCli data.add(new BasicNameValuePair("value", value)); data.add(new BasicNameValuePair("prevValue", prevValue)); - return set0(key, data, new int[] { 200, 412 }, 101); + return set0(key, data, new int[]{200, 412}, 101); } /** @@ -156,18 +157,18 @@ public ListenableFuture watch(String key) throws EtcdClientException * Watches the given subtree */ public ListenableFuture watch(String key, Long index, boolean recursive) throws EtcdClientException { - String suffix = "?wait=true"; - if (index != null) { - suffix += "&waitIndex=" + index; - } - if (recursive) { - suffix += "&recursive=true"; - } + String suffix = "?wait=true"; + if (index != null) { + suffix += "&waitIndex=" + index; + } + if (recursive) { + suffix += "&recursive=true"; + } URI uri = buildKeyUri("v2/keys", key, suffix); - HttpGet request = new HttpGet(uri); + UriRequest request = new UriRequest(Method.GET, uri); - return asyncExecute(request, new int[] { 200 }); + return asyncExecute(request, new int[]{200}); } /** @@ -176,7 +177,7 @@ public ListenableFuture watch(String key, Long index, boolean recurs public String getVersion() throws EtcdClientException { URI uri = baseUri.resolve("version"); - HttpGet request = new HttpGet(uri); + UriRequest request = new UriRequest(Method.GET, uri); // Technically not JSON, but it'll work // This call is the odd one out @@ -187,38 +188,85 @@ public String getVersion() throws EtcdClientException { return s.json; } + static class BasicNameValuePair { + + private final String key; + private final String value; + + public BasicNameValuePair(String key, String value) { + this.key = key; + this.value = value; + } + + } + + private String encodePairs(List pairs) { + AbstractBuilder b = com.mastfrog.url.Parameters.builder(); + StringBuilder sb = new StringBuilder(); + for (BasicNameValuePair p : pairs) { + try { + if (sb.length() > 0) { + sb.append('&'); + } + sb.append(URLEncoder.encode(p.key, "UTF-8")).append('=').append(URLEncoder.encode(p.value, "UTF-8")); + } catch (UnsupportedEncodingException ex) { + Exceptions.chuck(ex); + } + } + return sb.toString(); + } + private EtcdResult set0(String key, List data, int[] httpErrorCodes, int... expectedErrorCodes) throws EtcdClientException { URI uri = buildKeyUri("v2/keys", key, ""); - - HttpPut request = new HttpPut(uri); - - UrlEncodedFormEntity entity = new UrlEncodedFormEntity(data, Charsets.UTF_8); - request.setEntity(entity); + String body = encodePairs(data); + UriRequest request = new UriRequest(Method.PUT, uri, body); return syncExecute(request, httpErrorCodes, expectedErrorCodes); } public EtcdResult listChildren(String key) throws EtcdClientException { URI uri = buildKeyUri("v2/keys", key, "/"); - HttpGet request = new HttpGet(uri); + UriRequest request = new UriRequest(Method.GET, uri); - EtcdResult result = syncExecute(request, new int[] { 200 }); + EtcdResult result = syncExecute(request, new int[]{200}); return result; } - protected ListenableFuture asyncExecute(HttpUriRequest request, int[] expectedHttpStatusCodes, final int... expectedErrorCodes) + static class UriRequest { + + public final Method method; + public final URI uri; + public final String body; + + public UriRequest(Method method, URI uri) { + this(method, uri, null); + } + + public UriRequest(Method method, URI uri, String body) { + this.method = method; + this.uri = uri; + this.body = body; + } + + @Override + public String toString() { + return method.name() + ' ' + uri + (body == null ? "" : " --> " + body); + } + } + + protected ListenableFuture asyncExecute(UriRequest request, int[] expectedHttpStatusCodes, final int... expectedErrorCodes) throws EtcdClientException { ListenableFuture json = asyncExecuteJson(request, expectedHttpStatusCodes); - return Futures.transform(json, new AsyncFunction() { + return Futures.transformAsync(json, new AsyncFunction() { public ListenableFuture apply(JsonResponse json) throws Exception { EtcdResult result = jsonToEtcdResult(json, expectedErrorCodes); return Futures.immediateFuture(result); } - }); + }, ForkJoinPool.commonPool()); } - protected EtcdResult syncExecute(HttpUriRequest request, int[] expectedHttpStatusCodes, int... expectedErrorCodes) throws EtcdClientException { + protected EtcdResult syncExecute(UriRequest request, int[] expectedHttpStatusCodes, int... expectedErrorCodes) throws EtcdClientException { try { return asyncExecute(request, expectedHttpStatusCodes, expectedErrorCodes).get(); } catch (InterruptedException e) { @@ -273,7 +321,7 @@ private static boolean contains(int[] list, int find) { return false; } - protected List syncExecuteList(HttpUriRequest request) throws EtcdClientException { + protected List syncExecuteList(UriRequest request) throws EtcdClientException { JsonResponse response = syncExecuteJson(request, 200); if (response.json == null) { return null; @@ -298,7 +346,7 @@ protected List syncExecuteList(HttpUriRequest request) throws EtcdCl } } - protected JsonResponse syncExecuteJson(HttpUriRequest request, int... expectedHttpStatusCodes) throws EtcdClientException { + protected JsonResponse syncExecuteJson(UriRequest request, int... expectedHttpStatusCodes) throws EtcdClientException { try { return asyncExecuteJson(request, expectedHttpStatusCodes).get(); } catch (InterruptedException e) { @@ -307,40 +355,24 @@ protected JsonResponse syncExecuteJson(HttpUriRequest request, int... expectedHt } catch (ExecutionException e) { throw unwrap(e); } + } + + protected ListenableFuture asyncExecuteJson(UriRequest request, final int[] expectedHttpStatusCodes) throws EtcdClientException { + ListenableFuture response = asyncExecuteHttp(request); - // ListenableFuture response = asyncExecuteHttp(request); - // - // HttpResponse httpResponse; - // try { - // httpResponse = response.get(); - // } catch (InterruptedException e) { - // Thread.currentThread().interrupt(); - // throw new - // EtcdClientException("Interrupted during request processing", e); - // } catch (ExecutionException e) { - // // TODO: Unwrap? - // throw new EtcdClientException("Error executing request", e); - // } - // - // String json = parseJsonResponse(httpResponse); - // return json; - } - - protected ListenableFuture asyncExecuteJson(HttpUriRequest request, final int[] expectedHttpStatusCodes) throws EtcdClientException { - ListenableFuture response = asyncExecuteHttp(request); - - return Futures.transform(response, new AsyncFunction() { - public ListenableFuture apply(HttpResponse httpResponse) throws Exception { + return Futures.transformAsync(response, new AsyncFunction() { + public ListenableFuture apply(Response httpResponse) throws Exception { JsonResponse json = extractJsonResponse(httpResponse, expectedHttpStatusCodes); return Futures.immediateFuture(json); } - }); + }, ForkJoinPool.commonPool()); } /** * We need the status code & the response to parse an error response. */ static class JsonResponse { + final String json; final int httpStatusCode; @@ -351,34 +383,22 @@ public JsonResponse(String json, int statusCode) { } - protected JsonResponse extractJsonResponse(HttpResponse httpResponse, int[] expectedHttpStatusCodes) throws EtcdClientException { - try { - StatusLine statusLine = httpResponse.getStatusLine(); - int statusCode = statusLine.getStatusCode(); + protected JsonResponse extractJsonResponse(Response httpResponse, int[] expectedHttpStatusCodes) throws EtcdClientException { + HttpResponseStatus statusLine = httpResponse.status; + int statusCode = statusLine.code(); - String json = null; + String json = httpResponse.entity; - if (httpResponse.getEntity() != null) { - try { - json = EntityUtils.toString(httpResponse.getEntity()); - } catch (IOException e) { - throw new EtcdClientException("Error reading response", e); - } + if (!contains(expectedHttpStatusCodes, statusCode)) { + if (statusCode == 400 && json != null) { + // More information in JSON + } else { + throw new EtcdClientException("Error response from etcd: " + statusLine.toString(), + statusCode); } - - if (!contains(expectedHttpStatusCodes, statusCode)) { - if (statusCode == 400 && json != null) { - // More information in JSON - } else { - throw new EtcdClientException("Error response from etcd: " + statusLine.getReasonPhrase(), - statusCode); - } - } - - return new JsonResponse(json, statusCode); - } finally { - close(httpResponse); } + + return new JsonResponse(json, statusCode); } private URI buildKeyUri(String prefix, String key, String suffix) { @@ -397,34 +417,59 @@ private URI buildKeyUri(String prefix, String key, String suffix) { return uri; } - protected ListenableFuture asyncExecuteHttp(HttpUriRequest request) { - final SettableFuture future = SettableFuture.create(); + protected ListenableFuture asyncExecuteHttp(UriRequest request) { + final SettableFuture future = SettableFuture.create(); + + class Handler extends ResponseHandler { - httpClient.execute(request, new FutureCallback() { - public void completed(HttpResponse result) { - future.set(result); + public Handler() { + super(String.class); } - public void failed(Exception ex) { - future.setException(ex); + @Override + protected void onError(Throwable err) { + future.setException(err); } - public void cancelled() { - future.setException(new InterruptedException()); + @Override + protected void onErrorResponse(HttpResponseStatus status, HttpHeaders headers, String content) { +// future.setException(new IOException(status + content)); + future.set(new Response(status, headers, content)); } - }); + @Override + protected void receive(HttpResponseStatus status, HttpHeaders headers, String content) { + future.set(new Response(status, headers, content)); + } + } + + HttpRequestBuilder bldr = httpClient.request(request.method).setURL(request.uri.toASCIIString()); + + if (request.body != null) { + try { + bldr.setBody(request.body, MimeType.FORM_DATA); + } catch (IOException ex) { + Exceptions.chuck(ex); + } + } + ResponseFuture fut = bldr.execute(new Handler()); + // Pending - could listen on the FullContentReceived state and respond + // without waiting for the connection to be idle/closed return future; } - public static void close(HttpResponse response) { - if (response == null) { - return; - } - HttpEntity entity = response.getEntity(); - if (entity != null) { - EntityUtils.consumeQuietly(entity); + static class Response { + + public final HttpResponseStatus status; + public final HttpHeaders headers; + public final String entity; + + public Response(HttpResponseStatus status, HttpHeaders headers, String entity) { + this.status = status; + this.headers = headers; + this.entity = entity; } + } protected static String urlEscape(String s) {