diff --git a/core/src/main/java/zipkin2/reporter/HttpEndpointSupplier.java b/core/src/main/java/zipkin2/reporter/HttpEndpointSupplier.java
index 54d9fb3c..f18d46dd 100644
--- a/core/src/main/java/zipkin2/reporter/HttpEndpointSupplier.java
+++ b/core/src/main/java/zipkin2/reporter/HttpEndpointSupplier.java
@@ -20,9 +20,9 @@
* HTTP-based {@link BytesMessageSender senders} use this to resolve a potentially-pseudo endpoint
* passed by configuration to a real endpoint.
*
- * Usage Notes
+ * Implementation Notes
*
- * {@link BytesMessageSender senders} should implement the following logic:
+ * {@linkplain HttpSender} is a convenience type that implements the following logic:
*
* - During build, the sender should invoke the {@linkplain Factory}.
* - If the result is {@link ConstantHttpEndpointSupplier}, build the sender to use a static
@@ -32,8 +32,6 @@
*
- Call {@link #close()} once during {@link BytesMessageSender#close()}.
*
*
- * Implementation Notes
- *
* Implement friendly {@code toString()} functions, that include the real endpoint or the one
* passed to the {@linkplain Factory}.
*
@@ -46,6 +44,8 @@
* dependency injection, without limiting an HTTP framework that can handle groups, to a
* single-endpoint supplier.
*
+ * @see ConstantHttpEndpointSupplier
+ * @see HttpSender
* @since 3.3
*/
public interface HttpEndpointSupplier extends Closeable {
diff --git a/core/src/main/java/zipkin2/reporter/HttpSender.java b/core/src/main/java/zipkin2/reporter/HttpSender.java
new file mode 100644
index 00000000..329acf2d
--- /dev/null
+++ b/core/src/main/java/zipkin2/reporter/HttpSender.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2016-2024 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin2.reporter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+import zipkin2.reporter.HttpEndpointSupplier.Factory;
+
+import static zipkin2.reporter.Call.propagateIfFatal;
+
+/**
+ * Reports spans to Zipkin, using its POST endpoint.
+ *
+ *
Calls to {@linkplain #postSpans(Object, Object)} happen on the same async reporting thread,
+ * but {@linkplain #close()} might be called from any thread.
+ *
+ * @since 3.3
+ */
+public abstract class HttpSender extends BytesMessageSender.Base {
+ final Logger logger;
+ final HttpEndpointSupplier endpointSupplier;
+ final U endpoint;
+
+ /** close is typically called from a different thread */
+ final AtomicBoolean closeCalled = new AtomicBoolean();
+
+ /**
+ * Called each invocation of {@linkplain #postSpans(Object, Object)}, unless the
+ * {@linkplain HttpEndpointSupplier} is a {@linkplain ConstantHttpEndpointSupplier},
+ * Implementations should perform any validation needed here.
+ *
+ * @since 3.3
+ */
+ protected abstract U newEndpoint(String endpoint);
+
+ /**
+ * Creates a new POST body from the encoded spans.
+ *
+ *
Below is the simplest implementation, when {@linkplain HttpSender#} is a byte array.
+ * {@code
+ * @Override protected byte[] newBody(List encodedSpans) {
+ * return encoding.encode(encodedSpans);
+ * }
+ * }
+ *
+ *
If you need the "Content-Type" value, you can access it via {@link Encoding#mediaType()}.
+ *
+ * @since 3.3
+ */
+ protected abstract B newBody(List encodedSpans);
+
+ /**
+ * Implement to POST spans to the given endpoint.
+ *
+ * If you need the "Content-Type" value, you can access it via {@link Encoding#mediaType()}.
+ *
+ * @since 3.3
+ */
+ protected abstract void postSpans(U endpoint, B body) throws IOException;
+
+ /**
+ * Override to close any resources.
+ *
+ * @since 3.3
+ */
+ protected void doClose() {
+ }
+
+ protected HttpSender(Encoding encoding, Factory endpointSupplierFactory, String endpoint) {
+ this(Logger.getLogger(HttpSender.class.getName()), encoding, endpointSupplierFactory, endpoint);
+ }
+
+ HttpSender(Logger logger, Encoding encoding, Factory endpointSupplierFactory, String endpoint) {
+ super(encoding);
+ this.logger = logger;
+ if (endpointSupplierFactory == null) {
+ throw new NullPointerException("endpointSupplierFactory == null");
+ }
+ if (endpoint == null) throw new NullPointerException("endpoint == null");
+
+ HttpEndpointSupplier endpointSupplier = endpointSupplierFactory.create(endpoint);
+ if (endpointSupplier == null) {
+ throw new NullPointerException("endpointSupplierFactory.create() returned null");
+ }
+ if (endpointSupplier instanceof ConstantHttpEndpointSupplier) {
+ this.endpoint = nextEndpoint(endpointSupplier);
+ closeQuietly(endpointSupplier);
+ this.endpointSupplier = null;
+ } else {
+ this.endpoint = null;
+ this.endpointSupplier = endpointSupplier;
+ }
+ }
+
+ final U nextEndpoint(HttpEndpointSupplier endpointSupplier) {
+ String endpoint = endpointSupplier.get(); // eagerly resolve the endpoint
+ if (endpoint == null) throw new NullPointerException("endpointSupplier.get() returned null");
+ return newEndpoint(endpoint);
+ }
+
+ /** Defaults to the most common max message size: 512KB. */
+ @Override public int messageMaxBytes() {
+ return 512 * 1024;
+ }
+
+ /** Sends spans as an HTTP POST request. */
+ @Override public final void send(List encodedSpans) throws IOException {
+ if (closeCalled.get()) throw new ClosedSenderException();
+ U endpoint = this.endpoint;
+ if (endpoint == null) endpoint = nextEndpoint(endpointSupplier);
+ B body = newBody(encodedSpans);
+ if (body == null) throw new NullPointerException("newBody(encodedSpans) returned null");
+ postSpans(endpoint, newBody(encodedSpans));
+ }
+
+ @Override public final void close() {
+ if (!closeCalled.compareAndSet(false, true)) return; // already closed
+ closeQuietly(endpointSupplier);
+ doClose();
+ }
+
+ final void closeQuietly(HttpEndpointSupplier endpointSupplier) {
+ if (endpointSupplier == null) return;
+ try {
+ endpointSupplier.close();
+ } catch (Throwable t) {
+ propagateIfFatal(t);
+ logger.fine("ignoring error closing endpoint supplier: " + t.getMessage());
+ }
+ }
+
+ @Override public String toString() {
+ String name = getClass().getSimpleName();
+ if (endpoint != null) {
+ return name + "{" + endpoint + "}";
+ }
+ return name + "{" + endpointSupplier + "}";
+ }
+}
diff --git a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/Platform.java b/core/src/main/java/zipkin2/reporter/internal/Platform.java
similarity index 92%
rename from okhttp3/src/main/java/zipkin2/reporter/okhttp3/Platform.java
rename to core/src/main/java/zipkin2/reporter/internal/Platform.java
index 1b2b5151..ead051ab 100644
--- a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/Platform.java
+++ b/core/src/main/java/zipkin2/reporter/internal/Platform.java
@@ -11,23 +11,23 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-package zipkin2.reporter.okhttp3;
+package zipkin2.reporter.internal;
import java.io.IOException;
import java.lang.reflect.Constructor;
/** Taken from {@code zipkin2.reporter.internal.Platform} to avoid needing to shade over a single method. */
-abstract class Platform {
+public abstract class Platform {
private static final Platform PLATFORM = findPlatform();
Platform() {
}
- RuntimeException uncheckedIOException(IOException e) {
+ public RuntimeException uncheckedIOException(IOException e) {
return new RuntimeException(e);
}
- static Platform get() {
+ public static Platform get() {
return PLATFORM;
}
diff --git a/core/src/main/java/zipkin2/reporter/internal/SenderAdapter.java b/core/src/main/java/zipkin2/reporter/internal/SenderAdapter.java
new file mode 100644
index 00000000..bdab99e0
--- /dev/null
+++ b/core/src/main/java/zipkin2/reporter/internal/SenderAdapter.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2016-2024 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin2.reporter.internal;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import zipkin2.reporter.BytesMessageSender;
+import zipkin2.reporter.Call;
+import zipkin2.reporter.Callback;
+import zipkin2.reporter.CheckResult;
+import zipkin2.reporter.Encoding;
+import zipkin2.reporter.Sender;
+
+/**
+ * Reduces burden on types that need to extend {@linkplain Sender}.
+ */
+public abstract class SenderAdapter extends Sender {
+ protected abstract BytesMessageSender delegate();
+
+ @Override public final int messageSizeInBytes(List encodedSpans) {
+ return delegate().messageSizeInBytes(encodedSpans);
+ }
+
+ @Override public final int messageSizeInBytes(int encodedSizeInBytes) {
+ return delegate().messageSizeInBytes(encodedSizeInBytes);
+ }
+
+ @Override public final Encoding encoding() {
+ return delegate().encoding();
+ }
+
+ @Override public final int messageMaxBytes() {
+ return delegate().messageMaxBytes();
+ }
+
+ @Override @Deprecated public final Call sendSpans(List encodedSpans) {
+ return new SendSpans(encodedSpans);
+ }
+
+ @Override public final void send(List encodedSpans) throws IOException {
+ delegate().send(encodedSpans);
+ }
+
+ @Override @Deprecated public final CheckResult check() {
+ try {
+ delegate().send(Collections.emptyList());
+ return CheckResult.OK;
+ } catch (Throwable e) {
+ Call.propagateIfFatal(e);
+ return CheckResult.failed(e);
+ }
+ }
+
+ @Override public final void close() {
+ try {
+ delegate().close();
+ } catch (IOException e) {
+ throw Platform.get().uncheckedIOException(e);
+ }
+ }
+
+ @Override public final String toString() {
+ return delegate().toString();
+ }
+
+ final class SendSpans extends Call.Base {
+ private final List encodedSpans;
+
+ SendSpans(List encodedSpans) {
+ this.encodedSpans = encodedSpans;
+ }
+
+ @Override protected Void doExecute() throws IOException {
+ send(encodedSpans);
+ return null;
+ }
+
+ @Override protected void doEnqueue(Callback callback) {
+ try {
+ send(encodedSpans);
+ callback.onSuccess(null);
+ } catch (Throwable t) {
+ Call.propagateIfFatal(t);
+ callback.onError(t);
+ }
+ }
+
+ @Override public Call clone() {
+ return new SendSpans(encodedSpans);
+ }
+ }
+}
diff --git a/core/src/test/java/zipkin2/reporter/FakeHttpSender.java b/core/src/test/java/zipkin2/reporter/FakeHttpSender.java
new file mode 100644
index 00000000..0f50275f
--- /dev/null
+++ b/core/src/test/java/zipkin2/reporter/FakeHttpSender.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2016-2024 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin2.reporter;
+
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.logging.Logger;
+import zipkin2.Span;
+import zipkin2.codec.SpanBytesDecoder;
+import zipkin2.reporter.HttpEndpointSupplier.Factory;
+
+class FakeHttpSender extends HttpSender {
+ final String originalEndpoint;
+ final Consumer> onSpans;
+
+ FakeHttpSender(Logger logger, String endpoint, Consumer> onSpans) {
+ this(logger, endpoint, ConstantHttpEndpointSupplier.FACTORY, onSpans);
+ }
+
+ FakeHttpSender(Logger logger, String endpoint, Factory endpointSupplierFactory,
+ Consumer> onSpans) {
+ super(logger, Encoding.JSON, endpointSupplierFactory, endpoint);
+ this.originalEndpoint = endpoint;
+ this.onSpans = onSpans;
+ }
+
+ FakeHttpSender withHttpEndpointSupplierFactory(Factory endpointSupplierFactory) {
+ return new FakeHttpSender(logger, originalEndpoint, endpointSupplierFactory, onSpans);
+ }
+
+ /** close is typically called from a different thread */
+ volatile boolean closeCalled;
+
+ @Override protected String newEndpoint(String endpoint) {
+ try {
+ return URI.create(endpoint).toURL().toString(); // validate
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
+ @Override protected byte[] newBody(List encodedSpans) {
+ return encoding.encode(encodedSpans);
+ }
+
+ @Override protected void postSpans(String endpoint, byte[] body) {
+ List decoded = SpanBytesDecoder.JSON_V2.decodeList(body);
+ onSpans.accept(decoded);
+ }
+
+ @Override protected void doClose() {
+ closeCalled = true;
+ }
+}
diff --git a/core/src/test/java/zipkin2/reporter/HttpSenderTest.java b/core/src/test/java/zipkin2/reporter/HttpSenderTest.java
new file mode 100644
index 00000000..8d0df09a
--- /dev/null
+++ b/core/src/test/java/zipkin2/reporter/HttpSenderTest.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2016-2024 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin2.reporter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.logging.Logger;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import zipkin2.Span;
+import zipkin2.codec.SpanBytesEncoder;
+
+import static java.util.stream.Collectors.toList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static zipkin2.TestObjects.CLIENT_SPAN;
+
+@ExtendWith(MockitoExtension.class)
+class HttpSenderTest {
+
+ @Mock Logger logger;
+ @Mock Consumer> onSpans;
+ FakeHttpSender sender;
+
+ @BeforeEach
+ void newSender() {
+ sender = new FakeHttpSender(logger, "http://localhost:19092", onSpans);
+ }
+
+ @Test void illegalToSendWhenClosed() {
+ sender.close();
+
+ assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN))
+ .isInstanceOf(ClosedSenderException.class);
+ }
+
+ @Test void endpointSupplierFactory_defaultsToConstant() {
+ // The default connection supplier returns a constant URL
+ assertThat(sender.endpoint)
+ .isEqualTo("http://localhost:19092");
+ }
+
+ @Test void endpointSupplierFactory_constant() {
+ sender.close();
+ sender = sender.withHttpEndpointSupplierFactory(
+ e -> ConstantHttpEndpointSupplier.create("http://localhost:29092")
+ );
+
+ // The connection supplier has a constant URL
+ assertThat(sender.endpoint)
+ .isEqualTo("http://localhost:29092");
+ }
+
+ @Test void endpointSupplierFactory_constantBad() {
+ HttpEndpointSupplier.Factory badFactory =
+ e -> ConstantHttpEndpointSupplier.create("htp://localhost:9411/api/v1/spans");
+
+ assertThatThrownBy(() -> sender.withHttpEndpointSupplierFactory(badFactory))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("unknown protocol: htp");
+ }
+
+ @Test void endpointSupplierFactory_dynamic() {
+ AtomicInteger closeCalled = new AtomicInteger();
+ HttpEndpointSupplier dynamicEndpointSupplier = new HttpEndpointSupplier() {
+ @Override public String get() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public void close() {
+ closeCalled.incrementAndGet();
+ }
+ };
+
+ sender.close();
+ sender = sender.withHttpEndpointSupplierFactory(e -> dynamicEndpointSupplier);
+
+ // The connection supplier is deferred until send
+ assertThat(sender.endpointSupplier)
+ .isEqualTo(dynamicEndpointSupplier);
+
+ assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN))
+ .isInstanceOf(UnsupportedOperationException.class);
+
+ // Ensure that closing the sender closes the endpoint supplier
+ sender.close();
+ sender.close(); // check only closed once
+ assertThat(closeCalled).hasValue(1);
+ }
+
+ @Test void endpointSupplierFactory_ignoresCloseFailure() {
+ AtomicInteger closeCalled = new AtomicInteger();
+ HttpEndpointSupplier dynamicEndpointSupplier = new HttpEndpointSupplier() {
+ @Override public String get() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public void close() throws IOException {
+ closeCalled.incrementAndGet();
+ throw new IOException("unexpected");
+ }
+ };
+
+ sender.close();
+ sender = sender.withHttpEndpointSupplierFactory(e -> dynamicEndpointSupplier);
+
+ // Ensure that an exception closing the endpoint supplier doesn't propagate.
+ sender.close();
+ assertThat(closeCalled).hasValue(1);
+ }
+
+ @Test void endpointSupplierFactory_dynamicNull() {
+ sender.close();
+ sender = sender.withHttpEndpointSupplierFactory(e -> new BaseHttpEndpointSupplier() {
+ @Override public String get() {
+ return null;
+ }
+ });
+
+ assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("endpointSupplier.get() returned null");
+ }
+
+ @Test void endpointSupplierFactory_dynamicBad() {
+ sender.close();
+ sender = sender.withHttpEndpointSupplierFactory(e -> new BaseHttpEndpointSupplier() {
+ @Override public String get() {
+ return "htp://localhost:9411/api/v1/spans";
+ }
+ });
+
+ assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("unknown protocol: htp");
+ }
+
+ /**
+ * The output of toString() on {@link BytesMessageSender} implementations appears in thread names
+ * created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other
+ * monitoring tools, care should be taken to ensure the toString() output is a reasonable length
+ * and does not contain sensitive information.
+ */
+ @Test void toStringContainsOnlySummaryInformation() {
+ assertThat(sender).hasToString("FakeHttpSender{http://localhost:19092}");
+ }
+
+ static void sendSpans(BytesMessageSender sender, Span... spans) throws IOException {
+ SpanBytesEncoder bytesEncoder;
+ switch (sender.encoding()) {
+ case JSON:
+ bytesEncoder = SpanBytesEncoder.JSON_V2;
+ break;
+ case THRIFT:
+ bytesEncoder = SpanBytesEncoder.THRIFT;
+ break;
+ case PROTO3:
+ bytesEncoder = SpanBytesEncoder.PROTO3;
+ break;
+ default:
+ throw new UnsupportedOperationException("encoding: " + sender.encoding());
+ }
+ sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList()));
+ }
+
+ static abstract class BaseHttpEndpointSupplier implements HttpEndpointSupplier {
+ @Override public void close() {
+ }
+ }
+}
diff --git a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/PlatformTest.java b/core/src/test/java/zipkin2/reporter/internal/PlatformTest.java
similarity index 93%
rename from okhttp3/src/test/java/zipkin2/reporter/okhttp3/PlatformTest.java
rename to core/src/test/java/zipkin2/reporter/internal/PlatformTest.java
index 978b7d86..d8a59fb4 100644
--- a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/PlatformTest.java
+++ b/core/src/test/java/zipkin2/reporter/internal/PlatformTest.java
@@ -11,11 +11,12 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-package zipkin2.reporter.okhttp3;
+package zipkin2.reporter.internal;
import java.io.IOException;
import java.io.UncheckedIOException;
import org.junit.jupiter.api.Test;
+import zipkin2.reporter.internal.Platform;
import static org.assertj.core.api.Assertions.assertThat;
diff --git a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/HttpCall.java b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/HttpCall.java
deleted file mode 100644
index 818574ef..00000000
--- a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/HttpCall.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Copyright 2016-2024 The OpenZipkin Authors
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package zipkin2.reporter.okhttp3;
-
-import java.io.IOException;
-import okhttp3.Response;
-import okhttp3.ResponseBody;
-import okio.BufferedSource;
-import okio.GzipSource;
-import okio.Okio;
-import zipkin2.reporter.Call;
-import zipkin2.reporter.Callback;
-
-final class HttpCall extends Call {
-
- final okhttp3.Call call;
-
- HttpCall(okhttp3.Call call) {
- this.call = call;
- }
-
- @Override public Void execute() throws IOException {
- parseResponse(call.execute());
- return null;
- }
-
- @Override public void enqueue(Callback delegate) {
- call.enqueue(new V2CallbackAdapter<>(delegate));
- }
-
- @Override public void cancel() {
- call.cancel();
- }
-
- @Override public boolean isCanceled() {
- return call.isCanceled();
- }
-
- @Override public HttpCall clone() {
- return new HttpCall(call.clone());
- }
-
- static class V2CallbackAdapter implements okhttp3.Callback {
- final Callback delegate;
-
- V2CallbackAdapter(Callback delegate) {
- this.delegate = delegate;
- }
-
- @Override public void onFailure(okhttp3.Call call, IOException e) {
- delegate.onError(e);
- }
-
- /** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */
- @Override public void onResponse(okhttp3.Call call, Response response) {
- try {
- parseResponse(response);
- delegate.onSuccess(null);
- } catch (Throwable e) {
- propagateIfFatal(e);
- delegate.onError(e);
- }
- }
- }
-
- static void parseResponse(Response response) throws IOException {
- ResponseBody responseBody = response.body();
- if (responseBody == null) {
- if (response.isSuccessful()) {
- return;
- } else {
- throw new RuntimeException("response failed: " + response);
- }
- }
- try {
- BufferedSource content = responseBody.source();
- if ("gzip".equalsIgnoreCase(response.header("Content-Encoding"))) {
- content = Okio.buffer(new GzipSource(responseBody.source()));
- }
- if (!response.isSuccessful()) {
- throw new RuntimeException(
- "response for " + response.request().tag() + " failed: " + content.readUtf8());
- }
- } finally {
- responseBody.close();
- }
- }
-}
diff --git a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/InternalOkHttpSender.java b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/InternalOkHttpSender.java
new file mode 100644
index 00000000..0ac74daa
--- /dev/null
+++ b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/InternalOkHttpSender.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2016-2024 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin2.reporter.okhttp3;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import okhttp3.Call;
+import okhttp3.Dispatcher;
+import okhttp3.HttpUrl;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.ResponseBody;
+import okio.Buffer;
+import okio.BufferedSink;
+import okio.BufferedSource;
+import okio.GzipSink;
+import okio.GzipSource;
+import okio.Okio;
+import zipkin2.reporter.Component;
+import zipkin2.reporter.Encoding;
+import zipkin2.reporter.HttpSender;
+
+/**
+ * We have to nest this class until v4 when {@linkplain OkHttpSender} no longer needs to extend
+ * {@linkplain Component}.
+ */
+final class InternalOkHttpSender extends HttpSender {
+ final OkHttpClient client;
+ final RequestBodyMessageEncoder encoder;
+ final Encoding encoding;
+ final int messageMaxBytes, maxRequests;
+ final boolean compressionEnabled;
+
+ InternalOkHttpSender(OkHttpSender.Builder builder) {
+ super(builder.encoding, builder.endpointSupplierFactory, builder.endpoint);
+ encoding = builder.encoding;
+ encoder = RequestBodyMessageEncoder.forEncoding(encoding);
+ maxRequests = builder.maxRequests;
+ messageMaxBytes = builder.messageMaxBytes;
+ compressionEnabled = builder.compressionEnabled;
+ Dispatcher dispatcher = newDispatcher(maxRequests);
+
+ // doing the extra "build" here prevents us from leaking our dispatcher to the builder
+ client = builder.clientBuilder().build().newBuilder().dispatcher(dispatcher).build();
+ }
+
+ @Override public int messageMaxBytes() {
+ return messageMaxBytes;
+ }
+
+ @Override protected HttpUrl newEndpoint(String endpoint) {
+ HttpUrl parsed = HttpUrl.parse(endpoint);
+ if (parsed == null) throw new IllegalArgumentException("invalid POST url: " + endpoint);
+ return parsed;
+ }
+
+ @Override protected RequestBody newBody(List encodedSpans) {
+ return encoder.encode(encodedSpans);
+ }
+
+ @Override protected void postSpans(HttpUrl endpoint, RequestBody body) throws IOException {
+ Request request = newRequest(endpoint, body);
+ Call call = client.newCall(request);
+ parseResponse(call.execute());
+ }
+
+ Request newRequest(HttpUrl endpoint, RequestBody body)
+ throws IOException {
+ Request.Builder request = new Request.Builder().url(endpoint);
+ // Amplification can occur when the Zipkin endpoint is proxied, and the proxy is instrumented.
+ // This prevents that in proxies, such as Envoy, that understand B3 single format,
+ request.addHeader("b3", "0");
+ if (compressionEnabled) {
+ request.addHeader("Content-Encoding", "gzip");
+ Buffer gzipped = new Buffer();
+ BufferedSink gzipSink = Okio.buffer(new GzipSink(gzipped));
+ body.writeTo(gzipSink);
+ gzipSink.close();
+ body = new BufferRequestBody(body.contentType(), gzipped);
+ }
+ request.post(body);
+ return request.build();
+ }
+
+ static final class BufferRequestBody extends RequestBody {
+ final MediaType contentType;
+ final Buffer body;
+
+ BufferRequestBody(MediaType contentType, Buffer body) {
+ this.contentType = contentType;
+ this.body = body;
+ }
+
+ @Override public long contentLength() {
+ return body.size();
+ }
+
+ @Override public MediaType contentType() {
+ return contentType;
+ }
+
+ @Override public void writeTo(BufferedSink sink) throws IOException {
+ sink.write(body, body.size());
+ }
+ }
+
+ static void parseResponse(Response response) throws IOException {
+ ResponseBody responseBody = response.body();
+ if (responseBody == null) {
+ if (response.isSuccessful()) {
+ return;
+ } else {
+ throw new RuntimeException("response failed: " + response);
+ }
+ }
+ try {
+ BufferedSource content = responseBody.source();
+ if ("gzip".equalsIgnoreCase(response.header("Content-Encoding"))) {
+ content = Okio.buffer(new GzipSource(responseBody.source()));
+ }
+ if (!response.isSuccessful()) {
+ throw new RuntimeException(
+ "response for " + response.request().tag() + " failed: " + content.readUtf8());
+ }
+ } finally {
+ responseBody.close();
+ }
+ }
+
+ /** Waits up to a second for in-flight requests to finish before cancelling them */
+ @Override protected void doClose() {
+ Dispatcher dispatcher = client.dispatcher();
+ dispatcher.executorService().shutdown();
+ try {
+ if (!dispatcher.executorService().awaitTermination(1, TimeUnit.SECONDS)) {
+ dispatcher.cancelAll();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ static Dispatcher newDispatcher(int maxRequests) {
+ // bound the executor so that we get consistent performance
+ ThreadPoolExecutor dispatchExecutor =
+ new ThreadPoolExecutor(0, maxRequests, 60, TimeUnit.SECONDS,
+ // Using a synchronous queue means messages will send immediately until we hit max
+ // in-flight requests. Once max requests are hit, send will block the caller, which is
+ // the AsyncReporter flush thread. This is ok, as the AsyncReporter has a buffer of
+ // unsent spans for this purpose.
+ new SynchronousQueue(),
+ OkHttpSenderThreadFactory.INSTANCE);
+
+ Dispatcher dispatcher = new Dispatcher(dispatchExecutor);
+ dispatcher.setMaxRequests(maxRequests);
+ dispatcher.setMaxRequestsPerHost(maxRequests);
+ return dispatcher;
+ }
+
+ enum OkHttpSenderThreadFactory implements ThreadFactory {
+ INSTANCE;
+
+ @Override public Thread newThread(Runnable r) {
+ return new Thread(r, "OkHttpSender Dispatcher");
+ }
+ }
+
+ @Override public String toString() {
+ return super.toString().replace("Internal", "");
+ }
+}
diff --git a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java
index a888df1f..09c8757e 100644
--- a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java
+++ b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java
@@ -13,39 +13,16 @@
*/
package zipkin2.reporter.okhttp3;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Logger;
-import okhttp3.Call;
-import okhttp3.Dispatcher;
import okhttp3.HttpUrl;
-import okhttp3.MediaType;
import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okhttp3.Response;
-import okio.Buffer;
-import okio.BufferedSink;
-import okio.GzipSink;
-import okio.Okio;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.BytesMessageSender;
-import zipkin2.reporter.CheckResult;
-import zipkin2.reporter.ClosedSenderException;
import zipkin2.reporter.ConstantHttpEndpointSupplier;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.HttpEndpointSupplier;
-import zipkin2.reporter.Sender;
+import zipkin2.reporter.internal.SenderAdapter;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static zipkin2.reporter.Call.propagateIfFatal;
-import static zipkin2.reporter.okhttp3.HttpCall.parseResponse;
/**
* Reports spans to Zipkin, using its POST endpoint.
@@ -86,9 +63,7 @@
*
* This sender is thread-safe.
*/
-public final class OkHttpSender extends Sender {
- static final Logger logger = Logger.getLogger(OkHttpSender.class.getName());
-
+public final class OkHttpSender extends SenderAdapter {
/** Creates a sender that posts {@link Encoding#JSON} messages. */
public static OkHttpSender create(String endpoint) {
return newBuilder().endpoint(endpoint).build();
@@ -112,13 +87,13 @@ public static final class Builder {
}
Builder(OkHttpSender sender) {
- clientBuilder = sender.client.newBuilder();
+ clientBuilder = sender.delegate.client.newBuilder();
endpointSupplierFactory = sender.endpointSupplierFactory;
endpoint = sender.endpoint;
- maxRequests = sender.client.dispatcher().getMaxRequests();
- compressionEnabled = sender.compressionEnabled;
- encoding = sender.encoding;
- messageMaxBytes = sender.messageMaxBytes;
+ maxRequests = sender.delegate.client.dispatcher().getMaxRequests();
+ compressionEnabled = sender.delegate.compressionEnabled;
+ encoding = sender.delegate.encoding;
+ messageMaxBytes = sender.delegate.messageMaxBytes;
}
/**
@@ -202,139 +177,19 @@ public OkHttpClient.Builder clientBuilder() {
}
public OkHttpSender build() {
- String endpoint = this.endpoint;
if (endpoint == null) throw new NullPointerException("endpoint == null");
-
- HttpEndpointSupplier endpointSupplier = endpointSupplierFactory.create(endpoint);
- if (endpointSupplier == null) {
- throw new NullPointerException("endpointSupplierFactory.create() returned null");
- }
- if (endpointSupplier instanceof ConstantHttpEndpointSupplier) {
- endpoint = endpointSupplier.get(); // eagerly resolve the endpoint
- return new OkHttpSender(this, new ConstantHttpUrlSupplier(endpoint));
- }
- return new OkHttpSender(this, new DynamicHttpUrlSupplier(endpointSupplier));
- }
- }
-
- static HttpUrl toHttpUrl(String endpoint) {
- HttpUrl parsed = HttpUrl.parse(endpoint);
- if (parsed == null) throw new IllegalArgumentException("invalid POST url: " + endpoint);
- return parsed;
- }
-
- static abstract class HttpUrlSupplier {
- abstract HttpUrl get();
-
- void close() {
- }
- }
-
- static final class ConstantHttpUrlSupplier extends HttpUrlSupplier {
- final HttpUrl url;
-
- ConstantHttpUrlSupplier(String endpoint) {
- this.url = toHttpUrl(endpoint);
- }
-
- @Override HttpUrl get() {
- return url;
- }
-
- @Override public String toString() {
- return url.toString();
- }
- }
-
- static final class DynamicHttpUrlSupplier extends HttpUrlSupplier {
-
- final HttpEndpointSupplier endpointSupplier;
-
- DynamicHttpUrlSupplier(HttpEndpointSupplier endpointSupplier) {
- this.endpointSupplier = endpointSupplier;
- }
-
- @Override HttpUrl get() {
- String endpoint = endpointSupplier.get();
- if (endpoint == null) throw new NullPointerException("endpointSupplier.get() returned null");
- return toHttpUrl(endpoint);
- }
-
- @Override void close() {
- try {
- endpointSupplier.close();
- } catch (Throwable t) {
- propagateIfFatal(t);
- logger.fine("ignoring error closing endpoint supplier: " + t.getMessage());
- }
- }
-
- @Override public String toString() {
- return endpointSupplier.toString();
+ return new OkHttpSender(this);
}
}
+ final InternalOkHttpSender delegate;
final HttpEndpointSupplier.Factory endpointSupplierFactory; // for toBuilder()
final String endpoint; // for toBuilder()
- final HttpUrlSupplier urlSupplier;
- final OkHttpClient client;
- final RequestBodyMessageEncoder encoder;
- final Encoding encoding;
- final int messageMaxBytes, maxRequests;
- final boolean compressionEnabled;
-
- OkHttpSender(Builder builder, HttpUrlSupplier urlSupplier) {
- endpointSupplierFactory = builder.endpointSupplierFactory; // for toBuilder()
- endpoint = builder.endpoint; // for toBuilder()
-
- this.urlSupplier = urlSupplier;
- encoding = builder.encoding;
- switch (encoding) {
- case JSON:
- encoder = RequestBodyMessageEncoder.JSON;
- break;
- case THRIFT:
- encoder = RequestBodyMessageEncoder.THRIFT;
- break;
- case PROTO3:
- encoder = RequestBodyMessageEncoder.PROTO3;
- break;
- default:
- throw new UnsupportedOperationException("Unsupported encoding: " + encoding.name());
- }
- maxRequests = builder.maxRequests;
- messageMaxBytes = builder.messageMaxBytes;
- compressionEnabled = builder.compressionEnabled;
- Dispatcher dispatcher = newDispatcher(maxRequests);
-
- // doing the extra "build" here prevents us from leaking our dispatcher to the builder
- client = builder.clientBuilder().build().newBuilder().dispatcher(dispatcher).build();
- }
-
- static Dispatcher newDispatcher(int maxRequests) {
- // bound the executor so that we get consistent performance
- ThreadPoolExecutor dispatchExecutor =
- new ThreadPoolExecutor(0, maxRequests, 60, TimeUnit.SECONDS,
- // Using a synchronous queue means messages will send immediately until we hit max
- // in-flight requests. Once max requests are hit, send will block the caller, which is
- // the AsyncReporter flush thread. This is ok, as the AsyncReporter has a buffer of
- // unsent spans for this purpose.
- new SynchronousQueue(),
- OkHttpSenderThreadFactory.INSTANCE);
-
- Dispatcher dispatcher = new Dispatcher(dispatchExecutor);
- dispatcher.setMaxRequests(maxRequests);
- dispatcher.setMaxRequestsPerHost(maxRequests);
- return dispatcher;
- }
-
- enum OkHttpSenderThreadFactory implements ThreadFactory {
- INSTANCE;
-
- @Override public Thread newThread(Runnable r) {
- return new Thread(r, "OkHttpSender Dispatcher");
- }
+ OkHttpSender(Builder builder) {
+ this.delegate = new InternalOkHttpSender(builder);
+ this.endpointSupplierFactory = builder.endpointSupplierFactory; // for toBuilder()
+ this.endpoint = builder.endpoint; // for toBuilder()
}
/**
@@ -345,118 +200,7 @@ public Builder toBuilder() {
return new Builder(this);
}
- @Override public int messageSizeInBytes(List encodedSpans) {
- return encoding.listSizeInBytes(encodedSpans);
- }
-
- @Override public int messageSizeInBytes(int encodedSizeInBytes) {
- return encoding.listSizeInBytes(encodedSizeInBytes);
- }
-
- @Override public Encoding encoding() {
- return encoding;
- }
-
- @Override public int messageMaxBytes() {
- return messageMaxBytes;
- }
-
- /** close is typically called from a different thread */
- final AtomicBoolean closeCalled = new AtomicBoolean();
-
- /** {@inheritDoc} */
- @Override @Deprecated public zipkin2.reporter.Call sendSpans(List encodedSpans) {
- if (closeCalled.get()) throw new ClosedSenderException();
- Request request;
- try {
- request = newRequest(encoder.encode(encodedSpans));
- } catch (IOException e) {
- throw Platform.get().uncheckedIOException(e);
- }
- return new HttpCall(client.newCall(request));
- }
-
- /** Sends spans as a POST to {@link Builder#endpoint(String)}. */
- @Override public void send(List encodedSpans) throws IOException {
- if (closeCalled.get()) throw new ClosedSenderException();
- Request request = newRequest(encoder.encode(encodedSpans));
- Call call = client.newCall(request);
- parseResponse(call.execute());
- }
-
- /** {@inheritDoc} */
- @Override @Deprecated public CheckResult check() {
- try {
- Request request = new Request.Builder().url(urlSupplier.get())
- .post(encoder.encode(Collections.emptyList())).build();
- try (Response response = client.newCall(request).execute()) {
- if (!response.isSuccessful()) {
- return CheckResult.failed(new RuntimeException("check response failed: " + response));
- }
- }
- return CheckResult.OK;
- } catch (Exception e) {
- return CheckResult.failed(e);
- }
- }
-
- /** Waits up to a second for in-flight requests to finish before cancelling them */
- @Override public void close() {
- if (!closeCalled.compareAndSet(false, true)) return; // already closed
-
- urlSupplier.close();
-
- Dispatcher dispatcher = client.dispatcher();
- dispatcher.executorService().shutdown();
- try {
- if (!dispatcher.executorService().awaitTermination(1, TimeUnit.SECONDS)) {
- dispatcher.cancelAll();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- Request newRequest(RequestBody body) throws IOException {
- Request.Builder request = new Request.Builder().url(urlSupplier.get());
- // Amplification can occur when the Zipkin endpoint is proxied, and the proxy is instrumented.
- // This prevents that in proxies, such as Envoy, that understand B3 single format,
- request.addHeader("b3", "0");
- if (compressionEnabled) {
- request.addHeader("Content-Encoding", "gzip");
- Buffer gzipped = new Buffer();
- BufferedSink gzipSink = Okio.buffer(new GzipSink(gzipped));
- body.writeTo(gzipSink);
- gzipSink.close();
- body = new BufferRequestBody(body.contentType(), gzipped);
- }
- request.post(body);
- return request.build();
- }
-
- @Override public String toString() {
- return "OkHttpSender{" + urlSupplier + "}";
- }
-
- static final class BufferRequestBody extends RequestBody {
- final MediaType contentType;
- final Buffer body;
-
- BufferRequestBody(MediaType contentType, Buffer body) {
- this.contentType = contentType;
- this.body = body;
- }
-
- @Override public long contentLength() {
- return body.size();
- }
-
- @Override public MediaType contentType() {
- return contentType;
- }
-
- @Override public void writeTo(BufferedSink sink) throws IOException {
- sink.write(body, body.size());
- }
+ @Override protected BytesMessageSender delegate() {
+ return delegate;
}
}
diff --git a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/RequestBodyMessageEncoder.java b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/RequestBodyMessageEncoder.java
index 133f37d4..928a6a8d 100644
--- a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/RequestBodyMessageEncoder.java
+++ b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/RequestBodyMessageEncoder.java
@@ -38,6 +38,19 @@ enum RequestBodyMessageEncoder {
}
};
+ static RequestBodyMessageEncoder forEncoding(Encoding encoding) {
+ switch (encoding) {
+ case JSON:
+ return RequestBodyMessageEncoder.JSON;
+ case THRIFT:
+ return RequestBodyMessageEncoder.THRIFT;
+ case PROTO3:
+ return RequestBodyMessageEncoder.PROTO3;
+ default:
+ throw new UnsupportedOperationException("Unsupported encoding: " + encoding.name());
+ }
+ }
+
static abstract class StreamingRequestBody extends RequestBody {
final MediaType contentType;
final List values;
diff --git a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/ITOkHttpSender.java b/okhttp3/src/test/java/zipkin2/reporter/okhttp3/ITOkHttpSender.java
index 48832a7c..ed8c0f51 100644
--- a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/ITOkHttpSender.java
+++ b/okhttp3/src/test/java/zipkin2/reporter/okhttp3/ITOkHttpSender.java
@@ -193,84 +193,6 @@ public class ITOkHttpSender { // public for use in src/it
.isEqualTo("application/json");
}
- @Deprecated
- @Test void closeWhileRequestInFlight_cancelsRequest() throws Exception {
- server.shutdown(); // shutdown the normal zipkin rule
- sender.close();
-
- MockWebServer server = new MockWebServer();
- server.setDispatcher(new Dispatcher() {
- @Override public MockResponse dispatch(RecordedRequest request) throws InterruptedException {
- Thread.sleep(2000); // lingers after the one-second grace
- return new MockResponse();
- }
- });
- try {
- sender = OkHttpSender.create(server.url("/api/v1/spans").toString());
-
- AwaitableCallback callback = new AwaitableCallback();
-
- Call call = sender.sendSpans(asList(SpanBytesEncoder.JSON_V2.encode(CLIENT_SPAN)));
- new Thread(() ->
- call.enqueue(callback)
- ).start();
- Thread.sleep(100); // make sure the thread starts
-
- sender.close(); // close while request is in flight
-
- try {
- callback.await();
- failBecauseExceptionWasNotThrown(RuntimeException.class);
- } catch (RuntimeException e) {
- // throws because the request still in flight after a second was canceled
- assertThat(e.getCause()).isInstanceOf(IOException.class);
- }
- } finally {
- server.shutdown();
- }
- }
-
- /**
- * Each message by default is up to 5MiB, make sure these go out of process as soon as they can.
- */
- @Deprecated
- @Test void messagesSendImmediately() throws Exception {
- server.shutdown(); // shutdown the normal zipkin rule
- sender.close();
-
- CountDownLatch latch = new CountDownLatch(1);
- MockWebServer server = new MockWebServer();
- server.setDispatcher(new Dispatcher() {
- AtomicInteger count = new AtomicInteger();
-
- @Override public MockResponse dispatch(RecordedRequest request) throws InterruptedException {
- if (count.incrementAndGet() == 1) {
- latch.await();
- } else {
- latch.countDown();
- }
- return new MockResponse();
- }
- });
- try (OkHttpSender sender = OkHttpSender.create(server.url("/api/v1/spans").toString())) {
-
- AwaitableCallback callback1 = new AwaitableCallback();
- AwaitableCallback callback2 = new AwaitableCallback();
-
- Thread t = new Thread(() -> {
- sender.sendSpans(asList(SpanBytesEncoder.JSON_V2.encode(CLIENT_SPAN))).enqueue(callback1);
- sender.sendSpans(asList(SpanBytesEncoder.JSON_V2.encode(CLIENT_SPAN))).enqueue(callback2);
- });
- t.start();
- t.join();
-
- callback1.await();
- callback2.await();
- } finally {
- server.shutdown();
- }
- }
-
@Deprecated
@Test void closeWhileRequestInFlight_graceful() throws Exception {
server.shutdown(); // shutdown the normal zipkin rule
diff --git a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/HttpCallTest.java b/okhttp3/src/test/java/zipkin2/reporter/okhttp3/InternalOkHttpSenderTest.java
similarity index 92%
rename from okhttp3/src/test/java/zipkin2/reporter/okhttp3/HttpCallTest.java
rename to okhttp3/src/test/java/zipkin2/reporter/okhttp3/InternalOkHttpSenderTest.java
index ab62dd58..4f3a0365 100644
--- a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/HttpCallTest.java
+++ b/okhttp3/src/test/java/zipkin2/reporter/okhttp3/InternalOkHttpSenderTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2023 The OpenZipkin Authors
+ * Copyright 2016-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
@@ -25,7 +25,7 @@
import static org.assertj.core.api.Assertions.assertThat;
-class HttpCallTest {
+class InternalOkHttpSenderTest {
@Test void parseResponse_closesBody() throws Exception {
// It is difficult to prove close was called, this approach looks at an underlying stream
@@ -45,7 +45,7 @@ class HttpCallTest {
.body(ResponseBody.create(null, 1, Okio.buffer(Okio.source(in))))
.build();
- HttpCall.parseResponse(response);
+ InternalOkHttpSender.parseResponse(response);
assertThat(closed.get()).isTrue();
}
diff --git a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/OkHttpSenderTest.java b/okhttp3/src/test/java/zipkin2/reporter/okhttp3/OkHttpSenderTest.java
index 46aa4cb5..e44b3953 100644
--- a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/OkHttpSenderTest.java
+++ b/okhttp3/src/test/java/zipkin2/reporter/okhttp3/OkHttpSenderTest.java
@@ -69,125 +69,6 @@ class OkHttpSenderTest {
.isInstanceOf(IOException.class);
}
- @Test void illegalToSendWhenClosed() {
- sender.close();
-
- assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN))
- .isInstanceOf(ClosedSenderException.class);
- }
-
- @Test void endpointSupplierFactory_defaultsToConstant() {
- // The default connection supplier returns a constant URL
- assertThat(sender)
- .extracting("urlSupplier.url")
- .isEqualTo(HttpUrl.parse("http://localhost:19092"));
- }
-
- @Test void endpointSupplierFactory_constant() {
- sender.close();
- sender = sender.toBuilder()
- .endpointSupplierFactory(e -> ConstantHttpEndpointSupplier.create("http://localhost:29092"))
- .build();
-
- // The connection supplier has a constant URL
- assertThat(sender)
- .extracting("urlSupplier.url")
- .isEqualTo(HttpUrl.parse("http://localhost:29092"));
- }
-
- @Test void endpointSupplierFactory_constantBad() {
- OkHttpSender.Builder builder = sender.toBuilder()
- .endpointSupplierFactory(e -> ConstantHttpEndpointSupplier.create("htp://localhost:9411/api/v1/spans"));
-
- assertThatThrownBy(builder::build)
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("invalid POST url");
- }
-
- @Test void endpointSupplierFactory_dynamic() {
- AtomicInteger closeCalled = new AtomicInteger();
- HttpEndpointSupplier dynamicEndpointSupplier = new HttpEndpointSupplier() {
- @Override public String get() {
- throw new UnsupportedOperationException();
- }
-
- @Override public void close() {
- closeCalled.incrementAndGet();
- }
- };
-
- sender.close();
- sender = sender.toBuilder()
- .endpointSupplierFactory(e -> dynamicEndpointSupplier)
- .build();
-
- // The connection supplier is deferred until send
- assertThat(sender)
- .extracting("urlSupplier.endpointSupplier")
- .isEqualTo(dynamicEndpointSupplier);
-
- assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN))
- .isInstanceOf(UnsupportedOperationException.class);
-
- // Ensure that closing the sender closes the endpoint supplier
- sender.close();
- sender.close(); // check only closed once
- assertThat(closeCalled).hasValue(1);
- }
-
- @Test void endpointSupplierFactory_ignoresCloseFailure() {
- AtomicInteger closeCalled = new AtomicInteger();
- HttpEndpointSupplier dynamicEndpointSupplier = new HttpEndpointSupplier() {
- @Override public String get() {
- throw new UnsupportedOperationException();
- }
-
- @Override public void close() throws IOException {
- closeCalled.incrementAndGet();
- throw new IOException("unexpected");
- }
- };
-
- sender.close();
- sender = sender.toBuilder()
- .endpointSupplierFactory(e -> dynamicEndpointSupplier)
- .build();
-
- // Ensure that an exception closing the endpoint supplier doesn't propagate.
- sender.close();
- assertThat(closeCalled).hasValue(1);
- }
-
- @Test void endpointSupplierFactory_dynamicNull() {
- sender.close();
- sender = sender.toBuilder()
- .endpointSupplierFactory(e -> new BaseHttpEndpointSupplier() {
- @Override public String get() {
- return null;
- }
- })
- .build();
-
- assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN))
- .isInstanceOf(NullPointerException.class)
- .hasMessage("endpointSupplier.get() returned null");
- }
-
- @Test void endpointSupplierFactory_dynamicBad() {
- sender.close();
- sender = sender.toBuilder()
- .endpointSupplierFactory(e -> new BaseHttpEndpointSupplier() {
- @Override public String get() {
- return "htp://localhost:9411/api/v1/spans";
- }
- })
- .build();
-
- assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("invalid POST url");
- }
-
/**
* The output of toString() on {@link BytesMessageSender} implementations appears in thread names
* created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other
@@ -198,15 +79,8 @@ class OkHttpSenderTest {
assertThat(sender).hasToString("OkHttpSender{http://localhost:19092/}");
}
- @Test void outOfBandCancel() {
- HttpCall call = (HttpCall) sender.sendSpans(Collections.emptyList());
- call.cancel();
-
- assertThat(call.isCanceled()).isTrue();
- }
-
@Test void bugGuardCache() {
- assertThat(sender.client.cache())
+ assertThat(sender.delegate.client.cache())
.withFailMessage("senders should not open a disk cache")
.isNull();
}
diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/OkHttpSenderFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/OkHttpSenderFactoryBeanTest.java
index fa7ad12b..95cfa734 100755
--- a/spring-beans/src/test/java/zipkin2/reporter/beans/OkHttpSenderFactoryBeanTest.java
+++ b/spring-beans/src/test/java/zipkin2/reporter/beans/OkHttpSenderFactoryBeanTest.java
@@ -13,7 +13,7 @@
*/
package zipkin2.reporter.beans;
-import java.util.Arrays;
+import java.util.List;
import okhttp3.HttpUrl;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -41,7 +41,7 @@ class OkHttpSenderFactoryBeanTest {
);
assertThat(context.getBean("sender", OkHttpSender.class))
- .extracting("urlSupplier.endpointSupplier")
+ .extracting("delegate.endpointSupplier")
.isEqualTo(FakeEndpointSupplier.INSTANCE);
}
@@ -53,7 +53,7 @@ class OkHttpSenderFactoryBeanTest {
);
assertThat(context.getBean("sender", OkHttpSender.class))
- .extracting("urlSupplier.url")
+ .extracting("delegate.endpoint")
.isEqualTo(HttpUrl.parse("http://localhost:9411/api/v2/spans"));
}
@@ -66,7 +66,7 @@ class OkHttpSenderFactoryBeanTest {
);
assertThat(context.getBean("sender", OkHttpSender.class))
- .extracting("client.connectTimeoutMillis")
+ .extracting("delegate.client.connectTimeoutMillis")
.isEqualTo(1000);
}
@@ -79,7 +79,7 @@ class OkHttpSenderFactoryBeanTest {
);
assertThat(context.getBean("sender", OkHttpSender.class))
- .extracting("client.writeTimeoutMillis")
+ .extracting("delegate.client.writeTimeoutMillis")
.isEqualTo(1000);
}
@@ -92,7 +92,7 @@ class OkHttpSenderFactoryBeanTest {
);
assertThat(context.getBean("sender", OkHttpSender.class))
- .extracting("client.readTimeoutMillis")
+ .extracting("delegate.client.readTimeoutMillis")
.isEqualTo(1000);
}
@@ -105,7 +105,7 @@ class OkHttpSenderFactoryBeanTest {
);
assertThat(context.getBean("sender", OkHttpSender.class))
- .extracting("client.dispatcher.maxRequests")
+ .extracting("delegate.client.dispatcher.maxRequests")
.isEqualTo(4);
}
@@ -118,7 +118,7 @@ class OkHttpSenderFactoryBeanTest {
);
assertThat(context.getBean("sender", OkHttpSender.class))
- .extracting("compressionEnabled")
+ .extracting("delegate.compressionEnabled")
.isEqualTo(false);
}
@@ -131,7 +131,7 @@ class OkHttpSenderFactoryBeanTest {
);
assertThat(context.getBean("sender", OkHttpSender.class))
- .extracting("messageMaxBytes")
+ .extracting("delegate.messageMaxBytes")
.isEqualTo(1024);
}
@@ -144,7 +144,7 @@ class OkHttpSenderFactoryBeanTest {
);
assertThat(context.getBean("sender", OkHttpSender.class))
- .extracting("encoding")
+ .extracting("delegate.encoding")
.isEqualTo(Encoding.PROTO3);
}
@@ -159,7 +159,7 @@ class OkHttpSenderFactoryBeanTest {
OkHttpSender sender = context.getBean("sender", OkHttpSender.class);
context.close();
- sender.send(Arrays.asList(new byte[]{'{', '}'}));
+ sender.send(List.of(new byte[]{'{', '}'}));
});
}
}
diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/URLConnectionSenderFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/URLConnectionSenderFactoryBeanTest.java
index 52d0b198..826d6d43 100755
--- a/spring-beans/src/test/java/zipkin2/reporter/beans/URLConnectionSenderFactoryBeanTest.java
+++ b/spring-beans/src/test/java/zipkin2/reporter/beans/URLConnectionSenderFactoryBeanTest.java
@@ -16,6 +16,7 @@
import java.net.MalformedURLException;
import java.net.URI;
import java.util.Arrays;
+import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import zipkin2.reporter.Encoding;
@@ -42,7 +43,7 @@ class URLConnectionSenderFactoryBeanTest {
);
assertThat(context.getBean("sender", URLConnectionSender.class))
- .extracting("connectionSupplier.endpointSupplier")
+ .extracting("delegate.endpointSupplier")
.isEqualTo(FakeEndpointSupplier.INSTANCE);
}
@@ -54,7 +55,7 @@ class URLConnectionSenderFactoryBeanTest {
);
assertThat(context.getBean("sender", URLConnectionSender.class))
- .extracting("connectionSupplier.url")
+ .extracting("delegate.endpoint")
.isEqualTo(URI.create("http://localhost:9411/api/v2/spans").toURL());
}
@@ -98,7 +99,7 @@ class URLConnectionSenderFactoryBeanTest {
);
assertThat(context.getBean("sender", URLConnectionSender.class))
- .extracting("compressionEnabled")
+ .extracting("delegate.compressionEnabled")
.isEqualTo(false);
}
@@ -111,7 +112,7 @@ class URLConnectionSenderFactoryBeanTest {
);
assertThat(context.getBean("sender", URLConnectionSender.class))
- .extracting("messageMaxBytes")
+ .extracting("delegate.messageMaxBytes")
.isEqualTo(1024);
}
@@ -124,7 +125,7 @@ class URLConnectionSenderFactoryBeanTest {
);
assertThat(context.getBean("sender", URLConnectionSender.class))
- .extracting("encoding")
+ .extracting("delegate.encoding")
.isEqualTo(Encoding.PROTO3);
}
@@ -139,7 +140,7 @@ class URLConnectionSenderFactoryBeanTest {
URLConnectionSender sender = context.getBean("sender", URLConnectionSender.class);
context.close();
- sender.send(Arrays.asList(new byte[]{'{', '}'}));
+ sender.send(List.of(new byte[]{'{', '}'}));
});
}
}
diff --git a/urlconnection/src/main/java/zipkin2/reporter/urlconnection/InternalURLConnectionSender.java b/urlconnection/src/main/java/zipkin2/reporter/urlconnection/InternalURLConnectionSender.java
new file mode 100644
index 00000000..8709486e
--- /dev/null
+++ b/urlconnection/src/main/java/zipkin2/reporter/urlconnection/InternalURLConnectionSender.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2016-2024 The OpenZipkin Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package zipkin2.reporter.urlconnection;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.List;
+import java.util.zip.GZIPOutputStream;
+import zipkin2.reporter.Component;
+import zipkin2.reporter.HttpSender;
+
+/**
+ * We have to nest this class until v4 when {@linkplain URLConnectionSender} no longer needs to
+ * extend {@linkplain Component}.
+ */
+final class InternalURLConnectionSender extends HttpSender {
+
+ final int messageMaxBytes;
+ final int connectTimeout;
+ final int readTimeout;
+ final boolean compressionEnabled;
+
+ InternalURLConnectionSender(URLConnectionSender.Builder builder) {
+ super(builder.encoding, builder.endpointSupplierFactory, builder.endpoint);
+ this.messageMaxBytes = builder.messageMaxBytes;
+ this.connectTimeout = builder.connectTimeout;
+ this.readTimeout = builder.readTimeout;
+ this.compressionEnabled = builder.compressionEnabled;
+ }
+
+ @Override public int messageMaxBytes() {
+ return messageMaxBytes;
+ }
+
+ @Override protected URL newEndpoint(String endpoint) {
+ try {
+ return new URL(endpoint);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
+ @Override protected byte[] newBody(List encodedSpans) {
+ return encoding.encode(encodedSpans);
+ }
+
+ @Override protected void postSpans(URL endpoint, byte[] body) throws IOException {
+ // intentionally not closing the connection, to use keep-alives
+ HttpURLConnection connection = (HttpURLConnection) endpoint.openConnection();
+ connection.setConnectTimeout(connectTimeout);
+ connection.setReadTimeout(readTimeout);
+ connection.setRequestMethod("POST");
+ // Amplification can occur when the Zipkin endpoint is proxied, and the proxy is instrumented.
+ // This prevents that in proxies, such as Envoy, that understand B3 single format,
+ connection.addRequestProperty("b3", "0");
+ connection.addRequestProperty("Content-Type", encoding.mediaType());
+ if (compressionEnabled) {
+ connection.addRequestProperty("Content-Encoding", "gzip");
+ ByteArrayOutputStream gzipped = new ByteArrayOutputStream();
+ GZIPOutputStream compressor = new GZIPOutputStream(gzipped);
+ try {
+ compressor.write(body);
+ } finally {
+ compressor.close();
+ }
+ body = gzipped.toByteArray();
+ }
+ connection.setDoOutput(true);
+ connection.setFixedLengthStreamingMode(body.length);
+ connection.getOutputStream().write(body);
+
+ skipAllContent(connection);
+ }
+
+ /** This utility is verbose as we have a minimum java version of 6 */
+ static void skipAllContent(HttpURLConnection connection) throws IOException {
+ InputStream in = connection.getInputStream();
+ IOException thrown = skipAndSuppress(in);
+ if (thrown == null) return;
+ InputStream err = connection.getErrorStream();
+ if (err != null) skipAndSuppress(err); // null is possible, if the connection was dropped
+ throw thrown;
+ }
+
+ static IOException skipAndSuppress(InputStream in) {
+ try {
+ while (in.read() != -1) ; // skip
+ return null;
+ } catch (IOException e) {
+ return e;
+ } finally {
+ try {
+ in.close();
+ } catch (IOException suppressed) {
+ }
+ }
+ }
+
+ @Override public String toString() {
+ return super.toString().replace("Internal", "");
+ }
+}
diff --git a/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java b/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java
index 07fb5378..8de1467a 100644
--- a/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java
+++ b/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java
@@ -13,35 +13,20 @@
*/
package zipkin2.reporter.urlconnection;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Logger;
-import java.util.zip.GZIPOutputStream;
-import zipkin2.reporter.Call;
-import zipkin2.reporter.Callback;
-import zipkin2.reporter.CheckResult;
-import zipkin2.reporter.ClosedSenderException;
+import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.ConstantHttpEndpointSupplier;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.HttpEndpointSupplier;
-import zipkin2.reporter.Sender;
-
-import static zipkin2.reporter.Call.propagateIfFatal;
+import zipkin2.reporter.HttpEndpointSupplier.Factory;
+import zipkin2.reporter.internal.SenderAdapter;
/**
* Reports spans to Zipkin, using its POST endpoint.
*
* This sender is thread-safe.
*/
-public final class URLConnectionSender extends Sender {
- static final Logger logger = Logger.getLogger(URLConnectionSender.class.getName());
+public final class URLConnectionSender extends SenderAdapter {
/** Creates a sender that posts {@link Encoding#JSON} messages. */
public static URLConnectionSender create(String endpoint) {
@@ -53,7 +38,7 @@ public static Builder newBuilder() {
}
public static final class Builder {
- HttpEndpointSupplier.Factory endpointSupplierFactory = ConstantHttpEndpointSupplier.FACTORY;
+ Factory endpointSupplierFactory = ConstantHttpEndpointSupplier.FACTORY;
String endpoint;
Encoding encoding = Encoding.JSON;
int messageMaxBytes = 500000;
@@ -63,17 +48,17 @@ public static final class Builder {
Builder(URLConnectionSender sender) {
this.endpointSupplierFactory = sender.endpointSupplierFactory;
this.endpoint = sender.endpoint;
- this.encoding = sender.encoding;
- this.messageMaxBytes = sender.messageMaxBytes;
- this.connectTimeout = sender.connectTimeout;
- this.readTimeout = sender.readTimeout;
- this.compressionEnabled = sender.compressionEnabled;
+ this.encoding = sender.delegate.encoding();
+ this.messageMaxBytes = sender.delegate.messageMaxBytes;
+ this.connectTimeout = sender.delegate.connectTimeout;
+ this.readTimeout = sender.delegate.readTimeout;
+ this.compressionEnabled = sender.delegate.compressionEnabled;
}
/**
* No default. See JavaDoc on {@link HttpEndpointSupplier} for implementation notes.
*/
- public Builder endpointSupplierFactory(HttpEndpointSupplier.Factory endpointSupplierFactory) {
+ public Builder endpointSupplierFactory(Factory endpointSupplierFactory) {
if (endpointSupplierFactory == null) {
throw new NullPointerException("endpointSupplierFactory == null");
}
@@ -135,235 +120,29 @@ public Builder encoding(Encoding encoding) {
}
public URLConnectionSender build() {
- String endpoint = this.endpoint;
if (endpoint == null) throw new NullPointerException("endpoint == null");
-
- HttpEndpointSupplier endpointSupplier = endpointSupplierFactory.create(endpoint);
- if (endpointSupplier == null) {
- throw new NullPointerException("endpointSupplierFactory.create() returned null");
- }
- if (endpointSupplier instanceof ConstantHttpEndpointSupplier) {
- endpoint = endpointSupplier.get(); // eagerly resolve the endpoint
- return new URLConnectionSender(this, new ConstantHttpURLConnectionSupplier(endpoint));
- }
- return new URLConnectionSender(this, new DynamicHttpURLConnectionSupplier(endpointSupplier));
+ return new URLConnectionSender(this);
}
Builder() {
}
}
- static URL toURL(String endpoint) {
- try {
- return new URL(endpoint);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException(e.getMessage());
- }
- }
-
- static abstract class HttpURLConnectionSupplier {
- abstract HttpURLConnection openConnection() throws IOException;
-
- void close() {
- }
- }
-
- static final class ConstantHttpURLConnectionSupplier extends HttpURLConnectionSupplier {
- final URL url;
-
- ConstantHttpURLConnectionSupplier(String endpoint) {
- this.url = toURL(endpoint);
- }
-
- @Override HttpURLConnection openConnection() throws IOException {
- return (HttpURLConnection) url.openConnection();
- }
-
- @Override public String toString() {
- return url.toString();
- }
- }
-
- static final class DynamicHttpURLConnectionSupplier extends HttpURLConnectionSupplier {
- final HttpEndpointSupplier endpointSupplier;
-
- DynamicHttpURLConnectionSupplier(HttpEndpointSupplier endpointSupplier) {
- this.endpointSupplier = endpointSupplier;
- }
-
- @Override HttpURLConnection openConnection() throws IOException {
- String endpoint = endpointSupplier.get();
- if (endpoint == null) throw new NullPointerException("endpointSupplier.get() returned null");
- URL url = toURL(endpoint);
- return (HttpURLConnection) url.openConnection();
- }
-
- @Override void close() {
- try {
- endpointSupplier.close();
- } catch (Throwable t) {
- propagateIfFatal(t);
- logger.fine("ignoring error closing endpoint supplier: " + t.getMessage());
- }
- }
-
- @Override public String toString() {
- return endpointSupplier.toString();
- }
- }
-
- final HttpEndpointSupplier.Factory endpointSupplierFactory; // for toBuilder()
+ final InternalURLConnectionSender delegate;
+ final Factory endpointSupplierFactory; // for toBuilder()
final String endpoint; // for toBuilder()
- final HttpURLConnectionSupplier connectionSupplier;
- final Encoding encoding;
- final int messageMaxBytes;
- final int connectTimeout, readTimeout;
- final boolean compressionEnabled;
-
- URLConnectionSender(Builder builder, HttpURLConnectionSupplier connectionSupplier) {
+ URLConnectionSender(Builder builder) {
+ this.delegate = new InternalURLConnectionSender(builder);
this.endpointSupplierFactory = builder.endpointSupplierFactory; // for toBuilder()
this.endpoint = builder.endpoint; // for toBuilder()
-
- this.connectionSupplier = connectionSupplier;
- this.encoding = builder.encoding;
- this.messageMaxBytes = builder.messageMaxBytes;
- this.connectTimeout = builder.connectTimeout;
- this.readTimeout = builder.readTimeout;
- this.compressionEnabled = builder.compressionEnabled;
}
public Builder toBuilder() {
return new Builder(this);
}
- /** close is typically called from a different thread */
- final AtomicBoolean closeCalled = new AtomicBoolean();
-
- @Override public int messageSizeInBytes(List encodedSpans) {
- return encoding().listSizeInBytes(encodedSpans);
- }
-
- @Override public int messageSizeInBytes(int encodedSizeInBytes) {
- return encoding().listSizeInBytes(encodedSizeInBytes);
- }
-
- @Override public Encoding encoding() {
- return encoding;
- }
-
- @Override public int messageMaxBytes() {
- return messageMaxBytes;
- }
-
- /** {@inheritDoc} */
- @Override @Deprecated public Call sendSpans(List encodedSpans) {
- if (closeCalled.get()) throw new ClosedSenderException();
- return new HttpPostCall(encoding.encode(encodedSpans));
- }
-
- /** Sends spans as a POST to {@link Builder#endpoint}. */
- @Override public void send(List encodedSpans) throws IOException {
- if (closeCalled.get()) throw new ClosedSenderException();
- send(encoding.encode(encodedSpans));
- }
-
- /** {@inheritDoc} */
- @Override @Deprecated public CheckResult check() {
- try {
- send(encoding.encode(Collections.emptyList()));
- return CheckResult.OK;
- } catch (Throwable e) {
- Call.propagateIfFatal(e);
- return CheckResult.failed(e);
- }
- }
-
- void send(byte[] body) throws IOException {
- // intentionally not closing the connection, to use keep-alives
- HttpURLConnection connection = connectionSupplier.openConnection();
- connection.setConnectTimeout(connectTimeout);
- connection.setReadTimeout(readTimeout);
- connection.setRequestMethod("POST");
- // Amplification can occur when the Zipkin endpoint is proxied, and the proxy is instrumented.
- // This prevents that in proxies, such as Envoy, that understand B3 single format,
- connection.addRequestProperty("b3", "0");
- connection.addRequestProperty("Content-Type", encoding.mediaType());
- if (compressionEnabled) {
- connection.addRequestProperty("Content-Encoding", "gzip");
- ByteArrayOutputStream gzipped = new ByteArrayOutputStream();
- GZIPOutputStream compressor = new GZIPOutputStream(gzipped);
- try {
- compressor.write(body);
- } finally {
- compressor.close();
- }
- body = gzipped.toByteArray();
- }
- connection.setDoOutput(true);
- connection.setFixedLengthStreamingMode(body.length);
- connection.getOutputStream().write(body);
-
- skipAllContent(connection);
- }
-
- /** This utility is verbose as we have a minimum java version of 6 */
- static void skipAllContent(HttpURLConnection connection) throws IOException {
- InputStream in = connection.getInputStream();
- IOException thrown = skipAndSuppress(in);
- if (thrown == null) return;
- InputStream err = connection.getErrorStream();
- if (err != null) skipAndSuppress(err); // null is possible, if the connection was dropped
- throw thrown;
- }
-
- static IOException skipAndSuppress(InputStream in) {
- try {
- while (in.read() != -1) ; // skip
- return null;
- } catch (IOException e) {
- return e;
- } finally {
- try {
- in.close();
- } catch (IOException suppressed) {
- }
- }
- }
-
- @Override public void close() {
- if (!closeCalled.compareAndSet(false, true)) return; // already closed
- connectionSupplier.close();
- }
-
- @Override public String toString() {
- return "URLConnectionSender{" + connectionSupplier + "}";
- }
-
- class HttpPostCall extends Call.Base {
- private final byte[] message;
-
- HttpPostCall(byte[] message) {
- this.message = message;
- }
-
- @Override protected Void doExecute() throws IOException {
- send(message);
- return null;
- }
-
- @Override protected void doEnqueue(Callback callback) {
- try {
- send(message);
- callback.onSuccess(null);
- } catch (Throwable t) {
- Call.propagateIfFatal(t);
- callback.onError(t);
- }
- }
-
- @Override public Call clone() {
- return new HttpPostCall(message);
- }
+ @Override protected BytesMessageSender delegate() {
+ return delegate;
}
}
diff --git a/urlconnection/src/test/java/zipkin2/reporter/urlconnection/URLConnectionSenderTest.java b/urlconnection/src/test/java/zipkin2/reporter/urlconnection/URLConnectionSenderTest.java
index e6729de8..cf5ae522 100644
--- a/urlconnection/src/test/java/zipkin2/reporter/urlconnection/URLConnectionSenderTest.java
+++ b/urlconnection/src/test/java/zipkin2/reporter/urlconnection/URLConnectionSenderTest.java
@@ -63,130 +63,6 @@ class URLConnectionSenderTest {
.hasToString("URLConnectionSender{http://localhost:29092}");
}
- @Test void sendFailsWhenEndpointIsDown() {
- assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN))
- .isInstanceOf(ConnectException.class);
- }
-
- @Test void illegalToSendWhenClosed() {
- sender.close();
-
- assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN))
- .isInstanceOf(ClosedSenderException.class);
- }
-
- @Test void endpointSupplierFactory_defaultsToConstant() throws MalformedURLException {
- // The default connection supplier returns a constant URL
- assertThat(sender)
- .extracting("connectionSupplier.url")
- .isEqualTo(URI.create("http://localhost:19092").toURL());
- }
-
- @Test void endpointSupplierFactory_constant() throws MalformedURLException {
- sender.close();
- sender = sender.toBuilder()
- .endpointSupplierFactory(e -> ConstantHttpEndpointSupplier.create("http://localhost:29092"))
- .build();
-
- // The connection supplier has a constant URL
- assertThat(sender)
- .extracting("connectionSupplier.url")
- .isEqualTo(URI.create("http://localhost:29092").toURL());
- }
-
- @Test void endpointSupplierFactory_constantBad() {
- URLConnectionSender.Builder builder = sender.toBuilder()
- .endpointSupplierFactory(e -> ConstantHttpEndpointSupplier.create("htp://localhost:9411/api/v1/spans"));
-
- assertThatThrownBy(builder::build)
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("unknown protocol: htp");
- }
-
- @Test void endpointSupplierFactory_dynamic() {
- AtomicInteger closeCalled = new AtomicInteger();
- HttpEndpointSupplier dynamicEndpointSupplier = new HttpEndpointSupplier() {
- @Override public String get() {
- throw new UnsupportedOperationException();
- }
-
- @Override public void close() {
- closeCalled.incrementAndGet();
- }
- };
-
- sender.close();
- sender = sender.toBuilder()
- .endpointSupplierFactory(e -> dynamicEndpointSupplier)
- .build();
-
- // The connection supplier is deferred until send
- assertThat(sender)
- .extracting("connectionSupplier.endpointSupplier")
- .isEqualTo(dynamicEndpointSupplier);
-
- assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN))
- .isInstanceOf(UnsupportedOperationException.class);
-
- // Ensure that closing the sender closes the endpoint supplier
- sender.close();
- sender.close(); // check only closed once
- assertThat(closeCalled).hasValue(1);
- }
-
- @Test void endpointSupplierFactory_ignoresCloseFailure() {
- AtomicInteger closeCalled = new AtomicInteger();
- HttpEndpointSupplier dynamicEndpointSupplier = new HttpEndpointSupplier() {
- @Override public String get() {
- throw new UnsupportedOperationException();
- }
-
- @Override public void close() throws IOException {
- closeCalled.incrementAndGet();
- throw new IOException("unexpected");
- }
- };
-
- sender.close();
- sender = sender.toBuilder()
- .endpointSupplierFactory(e -> dynamicEndpointSupplier)
- .build();
-
- // Ensure that an exception closing the endpoint supplier doesn't propagate.
- sender.close();
- assertThat(closeCalled).hasValue(1);
- }
-
- @Test void endpointSupplierFactory_dynamicNull() {
- sender.close();
- sender = sender.toBuilder()
- .endpointSupplierFactory(e -> new BaseHttpEndpointSupplier() {
- @Override public String get() {
- return null;
- }
- })
- .build();
-
- assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN))
- .isInstanceOf(NullPointerException.class)
- .hasMessage("endpointSupplier.get() returned null");
- }
-
- @Test void endpointSupplierFactory_dynamicBad() {
- sender.close();
- sender = sender.toBuilder()
- .endpointSupplierFactory(e -> new BaseHttpEndpointSupplier() {
- @Override public String get() {
- return "htp://localhost:9411/api/v1/spans";
- }
- })
- .build();
-
- assertThatThrownBy(() -> sendSpans(sender, CLIENT_SPAN, CLIENT_SPAN))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("unknown protocol: htp");
- }
-
/**
* The output of toString() on {@link BytesMessageSender} implementations appears in thread names
* created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other