Skip to content

Commit

Permalink
Adds call utilities commonly needed (#1708)
Browse files Browse the repository at this point in the history
This adds `Call.emptyList()` needed for almost all span store operations
`Call.map()` for translating objects,  and `Call.flatMap()` needed to
transform an ID list into a get by ID call.
  • Loading branch information
adriancole authored Aug 28, 2017
1 parent ac0ba02 commit 88f1f29
Show file tree
Hide file tree
Showing 2 changed files with 282 additions and 38 deletions.
172 changes: 158 additions & 14 deletions zipkin/src/main/java/zipkin/internal/v2/Call.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
package zipkin.internal.v2;

import java.io.IOException;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;

/**
* This captures a (usually remote) request and can be used once, either {@link #execute()
Expand Down Expand Up @@ -47,13 +48,62 @@
* @param <V> the success type, only null when {@code V} is {@linkplain Void}.
*/
public abstract class Call<V> implements Cloneable {
@SuppressWarnings("unchecked")
public static <T> Call<List<T>> emptyList() {
return Call.create(Collections.emptyList());
}

public interface Mapper<V1, V2> {
V2 map(V1 input);
}

/**
* Maps the result of this call into a different shape, as defined by the {@code mapper} function.
* This is used to convert values from one type to another. For example, you could use this to
* convert between zipkin v1 and v2 span format.
*
* <pre>{@code
* getTracesV1Call = getTracesV2Call.map(traces -> v2TracesConverter);
* }</pre>
*
* <p>This method intends to be used for chaining. That means "this" instance should be discarded
* in favor of the result of this method.
*/
public final <R> Call<R> map(Mapper<V, R> mapper) {
return new Mapping<>(mapper, this);
}

public interface FlatMapper<V1, V2> {
Call<V2> map(V1 input);
}

/**
* Maps the result of this call into another, as defined by the {@code flatMapper} function. This
* is used to chain two remote calls together. For example, you could use this to chain a list IDs
* call to a get by IDs call.
*
* <pre>{@code
* getTracesCall = getIdsCall.flatMap(ids -> getTraces(ids));
*
* // this would now invoke the chain
* traces = getTracesCall.enqueue(tracesCallback);
* }</pre>
*
* Cancelation propagates to the mapped call.
*
* <p>This method intends to be used for chaining. That means "this" instance should be discarded
* in favor of the result of this method.
*/
public final <R> Call<R> flatMap(FlatMapper<V, R> flatMapper) {
return new FlatMapping<>(flatMapper, this);
}

/**
* Returns a completed call which has the supplied value. This is useful when input parameters
* imply there's no call needed. For example, an empty input might always result in an empty
* output.
*/
public static <V> Call<V> create(@Nullable V v) {
public static <V> Call<V> create(V v) {
return new Constant<>(v);
}

Expand All @@ -67,7 +117,7 @@ public static <V> Call<V> create(@Nullable V v) {
*
* @return a success value. Null is unexpected, except when {@code V} is {@linkplain Void}.
*/
@Nullable public abstract V execute() throws IOException;
public abstract V execute() throws IOException;

/**
* Invokes a request asynchronously, signaling the {@code callback} when complete. Invoking this
Expand Down Expand Up @@ -97,24 +147,120 @@ public static <V> Call<V> create(@Nullable V v) {
/** Returns a copy of this object, so you can make an identical follow-up request. */
public abstract Call<V> clone();

static final class Constant<V> extends Call<V> {
@Nullable final V v;
volatile boolean canceled;
boolean executed; // guarded by this
static final class Constant<V> extends Base<V> {
final V v;

Constant(@Nullable V v) {
Constant(V v) {
this.v = v;
}

@Override V doExecute() throws IOException {
return v;
}

@Override void doEnqueue(Callback<V> callback) {
callback.onSuccess(v);
}

@Override public Call<V> clone() {
return new Constant<>(v);
}
}

static final class Mapping<R, V> extends Base<R> {
final Mapper<V, R> mapper;
final Call<V> delegate;

Mapping(Mapper<V, R> Mapper, Call<V> delegate) {
this.mapper = Mapper;
this.delegate = delegate;
}

@Override public R doExecute() throws IOException {
return mapper.map(delegate.execute());
}

@Override public void doEnqueue(final Callback<R> callback) {
delegate.enqueue(new Callback<V>() {
@Override public void onSuccess(V value) {
try {
callback.onSuccess(mapper.map(value));
} catch (Throwable t) {
callback.onError(t);
}
}

@Override public void onError(Throwable t) {
callback.onError(t);
}
});
}

@Override public Call<R> clone() {
return new Mapping<>(mapper, delegate.clone());
}
}

static final class FlatMapping<R, V> extends Base<R> {
final FlatMapper<V, R> flatMapper;
final Call<V> delegate;
volatile Call<R> mapped;

FlatMapping(FlatMapper<V, R> flatMapper, Call<V> delegate) {
this.flatMapper = flatMapper;
this.delegate = delegate;
}

@Override public R doExecute() throws IOException {
return (mapped = flatMapper.map(delegate.execute())).execute();
}

@Override public void doEnqueue(final Callback<R> callback) {
delegate.enqueue(new Callback<V>() {
@Override public void onSuccess(V value) {
try {
(mapped = flatMapper.map(value)).enqueue(callback);
} catch (Throwable t) {
callback.onError(t);
}
}

@Override public void onError(Throwable t) {
callback.onError(t);
}
});
}

@Override public void cancel() {
super.cancel();
delegate.cancel();
if (mapped != null) mapped.cancel();
}

@Override public boolean isCanceled() {
return super.isCanceled() || delegate.isCanceled() || (mapped != null && mapped.isCanceled());
}

@Override public Call<R> clone() {
return new FlatMapping<>(flatMapper, delegate.clone());
}
}

static abstract class Base<V> extends Call<V> {
volatile boolean canceled;
boolean executed; // guarded by this

@Override public V execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
if (canceled) throw new IOException("Canceled");
return v;
return doExecute();
}

abstract V doExecute() throws IOException;

@Override public void enqueue(Callback<V> callback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
Expand All @@ -123,20 +269,18 @@ static final class Constant<V> extends Call<V> {
if (canceled) {
callback.onError(new IOException("Canceled"));
} else {
callback.onSuccess(v);
doEnqueue(callback);
}
}

abstract void doEnqueue(Callback<V> callback);

@Override public void cancel() {
canceled = true;
}

@Override public boolean isCanceled() {
return canceled;
}

@Override public Call<V> clone() {
return new Constant<>(v);
}
}
}
Loading

0 comments on commit 88f1f29

Please sign in to comment.