Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds v2 Call interface based on okhttp and retrofit #1705

Merged
merged 2 commits into from
Aug 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
import okio.Buffer;
import okio.ByteString;
import zipkin.Annotation;
import zipkin.internal.v2.Call;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;
import zipkin.internal.v2.storage.SpanConsumer;
import zipkin.storage.elasticsearch.http.internal.client.HttpCall;

import static zipkin.internal.Util.propagateIfFatal;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;

class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing
class ElasticsearchHttpSpanConsumer implements SpanConsumer { // not final for testing
static final Logger LOG = Logger.getLogger(ElasticsearchHttpSpanConsumer.class.getName());

final ElasticsearchHttpStorage es;
Expand All @@ -43,22 +43,14 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
this.indexNameFormatter = es.indexNameFormatter();
}

@Override public void accept(List<Span> spans, Callback<Void> callback) {
if (spans.isEmpty()) {
callback.onSuccess(null);
return;
}
try {
BulkSpanIndexer indexer = new BulkSpanIndexer(es);
indexSpans(indexer, spans);
indexer.execute(callback);
} catch (Throwable t) {
propagateIfFatal(t);
callback.onError(t);
}
@Override public Call<Void> accept(List<Span> spans) {
if (spans.isEmpty()) return Call.create(null);
BulkSpanIndexer indexer = new BulkSpanIndexer(es);
indexSpans(indexer, spans);
return indexer.newCall();
}

void indexSpans(BulkSpanIndexer indexer, List<Span> spans) throws IOException {
void indexSpans(BulkSpanIndexer indexer, List<Span> spans) {
for (Span span : spans) {
Long spanTimestamp = span.timestamp();
long indexTimestamp = 0L; // which index to store this span into
Expand All @@ -77,7 +69,7 @@ void indexSpans(BulkSpanIndexer indexer, List<Span> spans) throws IOException {
}
}

static class BulkSpanIndexer {
static final class BulkSpanIndexer {
final HttpBulkIndexer indexer;
final IndexNameFormatter indexNameFormatter;

Expand All @@ -92,8 +84,8 @@ void add(long indexTimestamp, Span span, @Nullable Long timestampMillis) {
indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
}

void execute(Callback<Void> callback) throws IOException {
indexer.execute(callback);
HttpCall<Void> newCall() {
return indexer.newCall();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import okhttp3.RequestBody;
import okio.Buffer;
import zipkin.internal.V2StorageComponent;
import zipkin.internal.v2.storage.SpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
import zipkin.storage.StorageAdapters;
Expand Down Expand Up @@ -220,7 +221,7 @@ public final Builder dateSeparator(char dateSeparator) {
}
}

@Override protected zipkin.internal.v2.storage.AsyncSpanConsumer v2AsyncSpanConsumer() {
@Override protected SpanConsumer v2AsyncSpanConsumer() {
ensureIndexTemplates();
return new ElasticsearchHttpSpanConsumer(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,23 @@ void writeDocument(byte[] document) {
}

/** Creates a bulk request when there is more than one object to store */
void execute(Callback<Void> callback) {
HttpCall<Void> newCall(){
HttpUrl url = pipeline != null
? http.baseUrl.newBuilder("_bulk").addQueryParameter("pipeline", pipeline).build()
: http.baseUrl.resolve("_bulk");

Request request = new Request.Builder().url(url).tag(tag)
.post(RequestBody.create(APPLICATION_JSON, body.readByteString())).build();

http.<Void>newCall(request, b -> {
return http.newCall(request, b -> {
String content = b.readUtf8();
if (content.indexOf("\"errors\":true") != -1) {
throw new IllegalStateException(content);
}
if (indices.isEmpty()) return null;
ElasticsearchHttpStorage.flush(http, join(indices));
return null;
}).submit(callback);
});
}

static String join(Collection<String> parts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import java.io.Closeable;
import java.io.IOException;
import okhttp3.Call;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
Expand All @@ -26,11 +25,12 @@
import okio.GzipSource;
import okio.Okio;
import zipkin.internal.CallbackCaptor;
import zipkin.storage.Callback;
import zipkin.internal.v2.Call;
import zipkin.internal.v2.Callback;

import static zipkin.internal.Util.propagateIfFatal;

public final class HttpCall<V> {
public final class HttpCall<V> extends Call<V> {

public interface BodyConverter<V> {
V convert(BufferedSource content) throws IOException;
Expand All @@ -46,7 +46,7 @@ public Factory(OkHttpClient ok, HttpUrl baseUrl) {
}

public <V> HttpCall<V> newCall(Request request, BodyConverter<V> bodyConverter) {
return new HttpCall(this, request, bodyConverter);
return new HttpCall<>(this, request, bodyConverter);
}

public <V> V execute(Request request, BodyConverter<V> bodyConverter) {
Expand All @@ -60,57 +60,108 @@ public <V> V execute(Request request, BodyConverter<V> bodyConverter) {
}
}

final Call.Factory ok;
final Request request;
final okhttp3.Call call;
final BodyConverter<V> bodyConverter;

HttpCall(Factory factory, Request request, BodyConverter<V> bodyConverter) {
this.ok = factory.ok;
this.request = request;
this(factory.ok.newCall(request), bodyConverter);
}

HttpCall(okhttp3.Call call, BodyConverter<V> bodyConverter) {
this.call = call;
this.bodyConverter = bodyConverter;
}

public void submit(Callback<V> delegate) {
ok.newCall(request).enqueue(new CallbackAdapter<>(bodyConverter, delegate));
@Override public V execute() throws IOException {
return parseResponse(call.execute(), bodyConverter);
}

static class CallbackAdapter<V> implements okhttp3.Callback {
@Override public void enqueue(Callback<V> delegate) {
call.enqueue(new V2CallbackAdapter<>(bodyConverter, delegate));
}

public void submit(zipkin.storage.Callback<V> delegate) {
call.enqueue(new CallbackAdapter<>(bodyConverter, delegate));
}

@Override public void cancel() {
call.cancel();
}

@Override public boolean isCanceled() {
return call.isCanceled();
}

@Override public HttpCall<V> clone() {
return new HttpCall<V>(call.clone(), bodyConverter);
}

static class V2CallbackAdapter<V> implements okhttp3.Callback {
final BodyConverter<V> bodyConverter;
final Callback<V> delegate;

CallbackAdapter(BodyConverter<V> bodyConverter, Callback<V> delegate) {
V2CallbackAdapter(BodyConverter<V> bodyConverter, Callback<V> delegate) {
this.bodyConverter = bodyConverter;
this.delegate = delegate;
}

@Override public void onFailure(Call call, IOException e) {
@Override public void onFailure(okhttp3.Call call, IOException e) {
delegate.onError(e);
}

/** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */
@Override public void onResponse(Call call, Response response) {
if (!HttpHeaders.hasBody(response)) {
if (response.isSuccessful()) {
delegate.onSuccess(null);
} else {
delegate.onError(new IllegalStateException("response failed: " + response));
}
return;
@Override public void onResponse(okhttp3.Call call, Response response) {
try {
delegate.onSuccess(parseResponse(response, bodyConverter));
} catch (Throwable e) {
propagateIfFatal(e);
delegate.onError(e);
}
}
}

static class CallbackAdapter<V> implements okhttp3.Callback {
final HttpCall.BodyConverter<V> bodyConverter;
final zipkin.storage.Callback<V> delegate;

CallbackAdapter(HttpCall.BodyConverter<V> bodyConverter, zipkin.storage.Callback<V> delegate) {
this.bodyConverter = bodyConverter;
this.delegate = delegate;
}

@Override public void onFailure(okhttp3.Call call, IOException e) {
delegate.onError(e);
}

/** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */
@Override public void onResponse(okhttp3.Call call, Response response) {
try {
delegate.onSuccess(parseResponse(response, bodyConverter));
} catch (Throwable e) {
propagateIfFatal(e);
delegate.onError(e);
}
}
}

static <V> V parseResponse(Response response, BodyConverter<V> bodyConverter) throws IOException {
if (!HttpHeaders.hasBody(response)) {
if (response.isSuccessful()) {
return null;
} else {
throw new IllegalStateException("response failed: " + response);
}
}
try (ResponseBody responseBody = response.body()) {
BufferedSource content = responseBody.source();
if ("gzip".equalsIgnoreCase(response.header("Content-Encoding"))) {
content = Okio.buffer(new GzipSource(responseBody.source()));
}
try (ResponseBody responseBody = response.body()) {
BufferedSource content = responseBody.source();
if ("gzip".equalsIgnoreCase(response.header("Content-Encoding"))) {
content = Okio.buffer(new GzipSource(responseBody.source()));
}
if (response.isSuccessful()) {
delegate.onSuccess(bodyConverter.convert(content));
} else {
delegate.onError(new IllegalStateException(
"response for " + response.request().tag() + " failed: " + content.readUtf8()));
}
} catch (Throwable t) {
propagateIfFatal(t);
delegate.onError(t);
if (response.isSuccessful()) {
return bodyConverter.convert(content);
} else {
throw new IllegalStateException(
"response for " + response.request().tag() + " failed: " + content.readUtf8());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.junit.Rule;
import org.junit.Test;
import zipkin.TestObjects;
import zipkin.internal.CallbackCaptor;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.Span.Kind;
import zipkin.internal.v2.codec.Decoder;
Expand Down Expand Up @@ -213,8 +212,6 @@ public void close() throws IOException {
}

void accept(Span... spans) throws Exception {
CallbackCaptor<Void> callback = new CallbackCaptor<>();
storage.v2AsyncSpanConsumer().accept(asList(spans), callback);
callback.get();
storage.v2AsyncSpanConsumer().accept(asList(spans)).execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package zipkin.storage.elasticsearch.http;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import zipkin.Codec;
import zipkin.DependencyLink;
Expand All @@ -32,9 +33,11 @@ public static void writeDependencyLinks(ElasticsearchHttpStorage es, List<Depend
byte[] document = Codec.JSON.writeDependencyLink(link);
indexer.add(index, DEPENDENCY, document, link.parent + "|" + link.child); // Unique constraint
}
CallbackCaptor<Void> callback = new CallbackCaptor<>();
indexer.execute(callback);
callback.get();
try {
indexer.newCall().execute();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static void clear(ElasticsearchHttpStorage es) throws IOException {
Expand Down
33 changes: 33 additions & 0 deletions zipkin/src/main/java/zipkin/internal/V2CallbackAdapter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.internal;

import javax.annotation.Nullable;
import zipkin.storage.Callback;

final class V2CallbackAdapter implements zipkin.internal.v2.Callback<Void> {
private final Callback<Void> callback;

V2CallbackAdapter(Callback<Void> callback) {
this.callback = callback;
}

@Override public void onSuccess(@Nullable Void value) {
callback.onSuccess(value);
}

@Override public void onError(Throwable t) {
callback.onError(t);
}
}
2 changes: 1 addition & 1 deletion zipkin/src/main/java/zipkin/internal/V2Collector.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void acceptSpans(byte[] serializedSpans, Decoder<Span> decoder, Callback<
}

@Override protected void record(List<Span> sampled, Callback<Void> callback) {
storage.v2AsyncSpanConsumer().accept(sampled, callback);
storage.v2AsyncSpanConsumer().accept(sampled).enqueue(new V2CallbackAdapter(callback));
}

@Override protected String idString(Span span) {
Expand Down
Loading