Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds v2 http query api #1710

Merged
merged 1 commit into from
Aug 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 108 additions & 25 deletions zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package zipkin.junit;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
Expand All @@ -30,23 +31,28 @@
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;
import zipkin.internal.V2JsonSpanDecoder;
import zipkin.internal.V2StorageComponent;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.internal.Platform;
import zipkin.storage.Callback;
import zipkin.storage.QueryRequest;
import zipkin.storage.SpanStore;
import zipkin.storage.StorageComponent;

import static zipkin.internal.Util.lowerHexToUnsignedLong;

final class ZipkinDispatcher extends Dispatcher {
static final long DEFAULT_LOOKBACK = 86400000L; // 1 day in millis
static final SpanDecoder JSON2_DECODER = new V2JsonSpanDecoder();

private final SpanStore store;
private final zipkin.internal.v2.storage.SpanStore store2;
private final Collector consumer;
private final CollectorMetrics metrics;
private final MockWebServer server;

ZipkinDispatcher(StorageComponent storage, CollectorMetrics metrics, MockWebServer server) {
ZipkinDispatcher(V2StorageComponent storage, CollectorMetrics metrics, MockWebServer server) {
this.store = storage.spanStore();
this.store2 = storage.v2SpanStore();
this.consumer = Collector.builder(getClass()).storage(storage).metrics(metrics).build();
this.metrics = metrics;
this.server = server;
Expand All @@ -58,27 +64,14 @@ public MockResponse dispatch(RecordedRequest request) {
if (request.getMethod().equals("GET")) {
if (url.encodedPath().equals("/health")) {
return new MockResponse().setBody("OK\n");
} else if (url.encodedPath().equals("/api/v1/services")) {
return jsonResponse(Codec.JSON.writeStrings(store.getServiceNames()));
} else if (url.encodedPath().equals("/api/v1/spans")) {
String serviceName = url.queryParameter("serviceName");
return jsonResponse(Codec.JSON.writeStrings(store.getSpanNames(serviceName)));
} else if (url.encodedPath().equals("/api/v1/dependencies")) {
Long endTs = maybeLong(url.queryParameter("endTs"));
Long lookback = maybeLong(url.queryParameter("lookback"));
List<DependencyLink> result = store.getDependencies(endTs, lookback);
return jsonResponse(Codec.JSON.writeDependencyLinks(result));
} else if (url.encodedPath().equals("/api/v1/traces")) {
QueryRequest queryRequest = toQueryRequest(url);
return jsonResponse(Codec.JSON.writeTraces(store.getTraces(queryRequest)));
} else if (url.encodedPath().startsWith("/api/v1/trace/")) {
String traceIdHex = url.encodedPath().replace("/api/v1/trace/", "");
long traceIdHigh = traceIdHex.length() == 32 ? lowerHexToUnsignedLong(traceIdHex, 0) : 0L;
long traceIdLow = lowerHexToUnsignedLong(traceIdHex);
List<Span> trace = url.queryParameterNames().contains("raw")
? store.getRawTrace(traceIdHigh, traceIdLow)
: store.getTrace(traceIdHigh, traceIdLow);
if (trace != null) return jsonResponse(Codec.JSON.writeSpans(trace));
} else if (url.encodedPath().startsWith("/api/v1/")) {
return queryV1(url);
} else if (url.encodedPath().startsWith("/api/v2/")) {
try {
return queryV2(url);
} catch (IOException e) {
throw Platform.get().uncheckedIOException(e);
}
}
} else if (request.getMethod().equals("POST")) {
if (url.encodedPath().equals("/api/v1/spans")) {
Expand All @@ -96,6 +89,81 @@ public MockResponse dispatch(RecordedRequest request) {
return new MockResponse().setResponseCode(404);
}

MockResponse queryV1(HttpUrl url) {
if (url.encodedPath().equals("/api/v1/services")) {
return jsonResponse(Codec.JSON.writeStrings(store.getServiceNames()));
} else if (url.encodedPath().equals("/api/v1/spans")) {
String serviceName = url.queryParameter("serviceName");
return jsonResponse(Codec.JSON.writeStrings(store.getSpanNames(serviceName)));
} else if (url.encodedPath().equals("/api/v1/dependencies")) {
Long endTs = maybeLong(url.queryParameter("endTs"));
Long lookback = maybeLong(url.queryParameter("lookback"));
List<DependencyLink> result = store.getDependencies(endTs, lookback);
return jsonResponse(Codec.JSON.writeDependencyLinks(result));
} else if (url.encodedPath().equals("/api/v1/traces")) {
QueryRequest queryRequest = toQueryRequest(url);
return jsonResponse(Codec.JSON.writeTraces(store.getTraces(queryRequest)));
} else if (url.encodedPath().startsWith("/api/v1/trace/")) {
String traceIdHex = url.encodedPath().replace("/api/v1/trace/", "");
long traceIdHigh = traceIdHex.length() == 32 ? lowerHexToUnsignedLong(traceIdHex, 0) : 0L;
long traceIdLow = lowerHexToUnsignedLong(traceIdHex);
List<Span> trace = url.queryParameterNames().contains("raw")
? store.getRawTrace(traceIdHigh, traceIdLow)
: store.getTrace(traceIdHigh, traceIdLow);
if (trace != null) return jsonResponse(Codec.JSON.writeSpans(trace));
}
return new MockResponse().setResponseCode(404);
}

MockResponse queryV2(HttpUrl url) throws IOException {
if (url.encodedPath().equals("/api/v2/services")) {
return jsonResponse(Codec.JSON.writeStrings(store2.getServiceNames().execute()));
} else if (url.encodedPath().equals("/api/v2/spans")) {
String serviceName = url.queryParameter("serviceName");
return jsonResponse(Codec.JSON.writeStrings(store2.getSpanNames(serviceName).execute()));
} else if (url.encodedPath().equals("/api/v2/dependencies")) {
Long endTs = maybeLong(url.queryParameter("endTs"));
Long lookback = maybeLong(url.queryParameter("lookback"));
List<DependencyLink> result = store2.getDependencies(
endTs != null ? endTs : System.currentTimeMillis(),
lookback != null ? lookback : DEFAULT_LOOKBACK
).execute();
return jsonResponse(Codec.JSON.writeDependencyLinks(result));
} else if (url.encodedPath().equals("/api/v2/traces")) {
List<List<zipkin.internal.v2.Span>> traces = store2.getTraces(toQueryRequest2(url)).execute();
ByteArrayOutputStream bout = new ByteArrayOutputStream();
bout.write('[');
for (int i = 0, length = traces.size(); i < length; ) {
List<zipkin.internal.v2.Span> trace = traces.get(i);
writeTrace(bout, trace);
if (++i < length) bout.write(',');
}
bout.write(']');
return jsonResponse(bout.toByteArray());
} else if (url.encodedPath().startsWith("/api/v2/trace/")) {
String traceIdHex = url.encodedPath().replace("/api/v2/trace/", "");
long traceIdHigh = traceIdHex.length() == 32 ? lowerHexToUnsignedLong(traceIdHex, 0) : 0L;
long traceIdLow = lowerHexToUnsignedLong(traceIdHex);
List<zipkin.internal.v2.Span> trace = store2.getTrace(traceIdHigh, traceIdLow).execute();
if (!trace.isEmpty()) {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
writeTrace(bout, trace);
return jsonResponse(bout.toByteArray());
}
}
return new MockResponse().setResponseCode(404);
}

static void writeTrace(ByteArrayOutputStream bout, List<zipkin.internal.v2.Span> trace)
throws IOException {
bout.write('[');
for (int i = 0, length = trace.size(); i < length; ) {
bout.write(Encoder.JSON.encode(trace.get(i)));
if (++i < length) bout.write(',');
}
bout.write(']');
}

MockResponse acceptSpans(RecordedRequest request, SpanDecoder decoder) {
metrics.incrementMessages();
byte[] body = request.getBody().readByteArray();
Expand Down Expand Up @@ -137,11 +205,26 @@ static QueryRequest toQueryRequest(HttpUrl url) {
.limit(maybeInteger(url.queryParameter("limit"))).build();
}

static Long maybeLong(@Nullable String input) {
static zipkin.internal.v2.storage.QueryRequest toQueryRequest2(HttpUrl url) {
Long endTs = maybeLong(url.queryParameter("endTs"));
Long lookback = maybeLong(url.queryParameter("lookback"));
Integer limit = maybeInteger(url.queryParameter("limit"));
return zipkin.internal.v2.storage.QueryRequest.newBuilder()
.serviceName(url.queryParameter("serviceName"))
.spanName(url.queryParameter("spanName"))
.parseAnnotationQuery(url.queryParameter("annotationQuery"))
.minDuration(maybeLong(url.queryParameter("minDuration")))
.maxDuration(maybeLong(url.queryParameter("maxDuration")))
.endTs(endTs != null ? endTs : System.currentTimeMillis())
.lookback(lookback != null ? lookback : DEFAULT_LOOKBACK)
.limit(limit != null ? limit : 10).build();
}

static @Nullable Long maybeLong(@Nullable String input) {
return input != null ? Long.valueOf(input) : null;
}

static Integer maybeInteger(@Nullable String input) {
static @Nullable Integer maybeInteger(@Nullable String input) {
return input != null ? Integer.valueOf(input) : null;
}

Expand Down
27 changes: 27 additions & 0 deletions zipkin-junit/src/test/java/zipkin/junit/v2/HttpException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.junit.v2;

final class HttpException extends RuntimeException {
final int code;

HttpException(String message, int code) {
super(message);
this.code = code;
}

int code() {
return code;
}
}
134 changes: 134 additions & 0 deletions zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2Call.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/**
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.junit.v2;

import java.io.Closeable;
import java.io.IOException;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.internal.http.HttpHeaders;
import okio.BufferedSource;
import okio.GzipSource;
import okio.Okio;
import zipkin.internal.v2.Call;
import zipkin.internal.v2.Callback;

import static zipkin.internal.Util.propagateIfFatal;

public final class HttpV2Call<V> extends Call<V> {

public interface BodyConverter<V> {
V convert(BufferedSource content) throws IOException;
}

public static class Factory implements Closeable {
final OkHttpClient ok;
public final HttpUrl baseUrl;

public Factory(OkHttpClient ok, HttpUrl baseUrl) {
this.ok = ok;
this.baseUrl = baseUrl;
}

public <V> HttpV2Call<V> newCall(Request request, BodyConverter<V> bodyConverter) {
return new HttpV2Call<>(this, request, bodyConverter);
}

@Override public void close() {
ok.dispatcher().executorService().shutdownNow();
}
}

final okhttp3.Call call;
final BodyConverter<V> bodyConverter;

HttpV2Call(Factory factory, Request request, BodyConverter<V> bodyConverter) {
this(factory.ok.newCall(request), bodyConverter);
}

HttpV2Call(okhttp3.Call call, BodyConverter<V> bodyConverter) {
this.call = call;
this.bodyConverter = bodyConverter;
}

@Override public V execute() throws IOException {
return parseResponse(call.execute(), bodyConverter);
}

@Override public void enqueue(Callback<V> delegate) {
call.enqueue(new CallbackAdapter<>(bodyConverter, delegate));
}

@Override public void cancel() {
call.cancel();
}

@Override public boolean isCanceled() {
return call.isCanceled();
}

@Override public HttpV2Call<V> clone() {
return new HttpV2Call<>(call.clone(), bodyConverter);
}

static class CallbackAdapter<V> implements okhttp3.Callback {
final BodyConverter<V> bodyConverter;
final Callback<V> delegate;

CallbackAdapter(BodyConverter<V> bodyConverter, Callback<V> delegate) {
this.bodyConverter = bodyConverter;
this.delegate = delegate;
}

@Override public void onFailure(okhttp3.Call call, IOException e) {
delegate.onError(e);
}

/** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */
@Override public void onResponse(okhttp3.Call call, Response response) {
try {
delegate.onSuccess(parseResponse(response, bodyConverter));
} catch (Throwable e) {
propagateIfFatal(e);
delegate.onError(e);
}
}
}

static <V> V parseResponse(Response response, BodyConverter<V> bodyConverter) throws IOException {
if (!HttpHeaders.hasBody(response)) {
if (response.isSuccessful()) {
return null;
} else {
throw new HttpException("response failed: " + response, response.code());
}
}
try (ResponseBody responseBody = response.body()) {
BufferedSource content = responseBody.source();
if ("gzip".equalsIgnoreCase(response.header("Content-Encoding"))) {
content = Okio.buffer(new GzipSource(responseBody.source()));
}
if (response.isSuccessful()) {
return bodyConverter.convert(content);
} else {
String tag = response.request().tag().toString();
throw new HttpException("response for " + tag + " failed: " + content.readUtf8(),
response.code());
}
}
}
}
49 changes: 49 additions & 0 deletions zipkin-junit/src/test/java/zipkin/junit/v2/HttpV2SpanConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin.junit.v2;

import java.util.List;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okio.Buffer;
import zipkin.internal.v2.Span;
import zipkin.internal.v2.codec.Encoder;
import zipkin.internal.v2.storage.SpanConsumer;

/** Implements the span consumer interface by forwarding requests over http. */
final class HttpV2SpanConsumer implements SpanConsumer {
final HttpV2Call.Factory factory;

HttpV2SpanConsumer(OkHttpClient client, HttpUrl baseUrl) {
this.factory = new HttpV2Call.Factory(client, baseUrl);
}

@Override public zipkin.internal.v2.Call<Void> accept(List<Span> spans) {
Buffer json = new Buffer();
json.writeByte('[');
for (int i = 0, length = spans.size(); i < length; ) {
json.write(Encoder.JSON.encode(spans.get(i)));
if (++i < length) json.writeByte(',');
}
json.writeByte(']');
return factory.newCall(new Request.Builder()
.url(factory.baseUrl.resolve("/api/v2/spans"))
.post(RequestBody.create(MediaType.parse("application/json"), json.readByteArray())).build(),
b -> null /* void */
);
}
}
Loading