Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nested Mqtt5Exceptions are now mapped to Mqtt3Exceptions #316

Merged
merged 4 commits into from
Aug 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private static class MqttPublishes implements Mqtt5Publishes, FlowableSubscriber
private final @NotNull AtomicReference<@Nullable Subscription> subscription = new AtomicReference<>();
private final @NotNull LinkedList<Entry> entries = new LinkedList<>();
private @Nullable Mqtt5Publish queuedPublish;
private boolean cancelled;
private @Nullable Throwable error;

MqttPublishes(final @NotNull Flowable<Mqtt5Publish> publishes) {
publishes.subscribe(this);
Expand All @@ -192,7 +192,7 @@ private void request() {
@Override
public void onNext(final @NotNull Mqtt5Publish publish) {
synchronized (entries) {
if (cancelled) {
if (error != null) {
return;
}
Entry entry;
Expand All @@ -216,9 +216,10 @@ public void onComplete() {
@Override
public void onError(final @NotNull Throwable t) {
synchronized (entries) {
if (cancelled) {
if (error != null) {
return;
}
error = t;
Entry entry;
while ((entry = entries.poll()) != null) {
entry.result.set(t);
Expand All @@ -231,8 +232,8 @@ public void onError(final @NotNull Throwable t) {
public @NotNull Mqtt5Publish receive() throws InterruptedException {
final Entry entry;
synchronized (entries) {
if (cancelled) {
throw new CancellationException();
if (error != null) {
throw handleError(error);
}
final Mqtt5Publish publish = receiveNowUnsafe();
if (publish != null) {
Expand All @@ -253,10 +254,7 @@ public void onError(final @NotNull Throwable t) {
return (Mqtt5Publish) result;
}
if (result instanceof Throwable) {
if (result instanceof RuntimeException) {
throw AsyncRuntimeException.fillInStackTrace((RuntimeException) result);
}
throw new RuntimeException((Throwable) result);
throw handleError((Throwable) result);
}
if (interruptedException != null) {
throw interruptedException;
Expand All @@ -275,8 +273,8 @@ public void onError(final @NotNull Throwable t) {

final Entry entry;
synchronized (entries) {
if (cancelled) {
throw new CancellationException();
if (error != null) {
throw handleError(error);
}
final Mqtt5Publish publish = receiveNowUnsafe();
if (publish != null) {
Expand All @@ -297,10 +295,7 @@ public void onError(final @NotNull Throwable t) {
return Optional.of((Mqtt5Publish) result);
}
if (result instanceof Throwable) {
if (result instanceof RuntimeException) {
throw AsyncRuntimeException.fillInStackTrace((RuntimeException) result);
}
throw new RuntimeException((Throwable) result);
throw handleError((Throwable) result);
}
if (interruptedException != null) {
throw interruptedException;
Expand All @@ -312,8 +307,8 @@ public void onError(final @NotNull Throwable t) {
public @NotNull Optional<Mqtt5Publish> receiveNow() {
final Mqtt5Publish publish;
synchronized (entries) {
if (cancelled) {
throw new CancellationException();
if (error != null) {
throw handleError(error);
}
publish = receiveNowUnsafe();
}
Expand All @@ -337,18 +332,25 @@ public void close() {
subscription.cancel();
}
synchronized (entries) {
if (cancelled) {
if (error != null) {
return;
}
cancelled = true;
error = new CancellationException();
Entry entry;
while ((entry = entries.poll()) != null) {
entry.result.set(new CancellationException());
entry.result.set(error);
entry.latch.countDown();
}
}
}

private @NotNull RuntimeException handleError(final @NotNull Throwable t) {
if (t instanceof RuntimeException) {
return AsyncRuntimeException.fillInStackTrace((RuntimeException) t);
}
throw new RuntimeException(t);
}

private static class Entry {

static final @NotNull Object CANCELLED = new Object();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttUnsubscribe;
import com.hivemq.client.internal.mqtt.mqtt3.exceptions.Mqtt3ExceptionFactory;
import com.hivemq.client.internal.mqtt.util.MqttChecks;
import com.hivemq.client.internal.util.AsyncRuntimeException;
import com.hivemq.client.internal.util.Checks;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
Expand Down Expand Up @@ -68,7 +67,7 @@ public class Mqtt3BlockingClientView implements Mqtt3BlockingClient {
try {
return Mqtt3ConnAckView.of(delegate.connect(mqttConnect));
} catch (final Mqtt5MessageException e) {
throw AsyncRuntimeException.fillInStackTrace(Mqtt3ExceptionFactory.map(e));
throw Mqtt3ExceptionFactory.mapWithStackTrace(e);
}
}

Expand All @@ -78,7 +77,7 @@ public class Mqtt3BlockingClientView implements Mqtt3BlockingClient {
try {
return Mqtt3SubAckView.of(delegate.subscribe(mqttSubscribe));
} catch (final Mqtt5MessageException e) {
throw AsyncRuntimeException.fillInStackTrace(Mqtt3ExceptionFactory.map(e));
throw Mqtt3ExceptionFactory.mapWithStackTrace(e);
}
}

Expand All @@ -95,7 +94,7 @@ public void unsubscribe(final @Nullable Mqtt3Unsubscribe unsubscribe) {
try {
delegate.unsubscribe(mqttUnsubscribe);
} catch (final Mqtt5MessageException e) {
throw AsyncRuntimeException.fillInStackTrace(Mqtt3ExceptionFactory.map(e));
throw Mqtt3ExceptionFactory.mapWithStackTrace(e);
}
}

Expand All @@ -105,7 +104,7 @@ public void publish(final @Nullable Mqtt3Publish publish) {
try {
delegate.publish(mqttPublish);
} catch (final Mqtt5MessageException e) {
throw AsyncRuntimeException.fillInStackTrace(Mqtt3ExceptionFactory.map(e));
throw Mqtt3ExceptionFactory.mapWithStackTrace(e);
}
}

Expand All @@ -114,7 +113,7 @@ public void disconnect() {
try {
delegate.disconnect(Mqtt3DisconnectView.DELEGATE);
} catch (final Mqtt5MessageException e) {
throw AsyncRuntimeException.fillInStackTrace(Mqtt3ExceptionFactory.map(e));
throw Mqtt3ExceptionFactory.mapWithStackTrace(e);
}
}

Expand Down Expand Up @@ -145,8 +144,8 @@ private static class Mqtt3PublishesView implements Mqtt3Publishes {
public @NotNull Mqtt3Publish receive() throws InterruptedException {
try {
return Mqtt3PublishView.of(delegate.receive());
} catch (final Mqtt5MessageException e) {
throw AsyncRuntimeException.fillInStackTrace(Mqtt3ExceptionFactory.map(e));
} catch (final RuntimeException e) {
throw Mqtt3ExceptionFactory.mapWithStackTrace(e);
}
}

Expand All @@ -161,17 +160,17 @@ private static class Mqtt3PublishesView implements Mqtt3Publishes {

try {
return delegate.receive(timeout, timeUnit).map(Mqtt3PublishView.JAVA_MAPPER);
} catch (final Mqtt5MessageException e) {
throw AsyncRuntimeException.fillInStackTrace(Mqtt3ExceptionFactory.map(e));
} catch (final RuntimeException e) {
throw Mqtt3ExceptionFactory.mapWithStackTrace(e);
}
}

@Override
public @NotNull Optional<Mqtt3Publish> receiveNow() {
try {
return delegate.receiveNow().map(Mqtt3PublishView.JAVA_MAPPER);
} catch (final Mqtt5MessageException e) {
throw AsyncRuntimeException.fillInStackTrace(Mqtt3ExceptionFactory.map(e));
} catch (final RuntimeException e) {
throw Mqtt3ExceptionFactory.mapWithStackTrace(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.hivemq.client.internal.mqtt.message.connect.connack.mqtt3.Mqtt3ConnAckView;
import com.hivemq.client.internal.mqtt.message.subscribe.suback.MqttSubAck;
import com.hivemq.client.internal.mqtt.message.subscribe.suback.mqtt3.Mqtt3SubAckView;
import com.hivemq.client.mqtt.exceptions.MqttSessionExpiredException;
import com.hivemq.client.mqtt.mqtt3.exceptions.*;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5MessageException;
import com.hivemq.client.mqtt.mqtt5.message.Mqtt5Message;
Expand All @@ -37,12 +38,33 @@ public final class Mqtt3ExceptionFactory {
Mqtt3ExceptionFactory::map;

public static @NotNull Throwable map(final @NotNull Throwable throwable) {
if (throwable instanceof Mqtt5MessageException) {
return map((Mqtt5MessageException) throwable);
if (throwable instanceof RuntimeException) {
return map((RuntimeException) throwable);
}
return throwable;
}

public static @NotNull RuntimeException map(final @NotNull RuntimeException e) {
if (e instanceof Mqtt5MessageException) {
return map((Mqtt5MessageException) e);
}
if (e instanceof MqttSessionExpiredException) {
final Throwable cause = e.getCause();
if (cause instanceof Mqtt5MessageException) {
return new MqttSessionExpiredException(e.getMessage(), map((Mqtt5MessageException) cause));
}
}
return e;
}

public static @NotNull RuntimeException mapWithStackTrace(final @NotNull RuntimeException e) {
final RuntimeException mapped = map(e);
if (mapped != e) {
mapped.setStackTrace(e.getStackTrace());
}
return mapped;
}

public static @NotNull Mqtt3MessageException map(final @NotNull Mqtt5MessageException mqtt5MessageException) {
final Mqtt5Message mqttMessage = mqtt5MessageException.getMqttMessage();
final String message = mqtt5MessageException.getMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,54 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Arrays;

/**
* @author Silvio Giebl
*/
public abstract class AsyncRuntimeException extends RuntimeException {

public static @NotNull RuntimeException fillInStackTrace(final @NotNull RuntimeException e) {
if (e instanceof AsyncRuntimeException) {
e.fillInStackTrace();
final AsyncRuntimeException copy = ((AsyncRuntimeException) e).copy();
final StackTraceElement[] stackTrace = copy.getStackTrace();
// remove the copy and fillInStackTrace method calls from the trace
int remove = 0;
while (remove < stackTrace.length) {
final StackTraceElement stackTraceElement = stackTrace[remove];
remove++;
if (stackTraceElement.getClassName().equals(AsyncRuntimeException.class.getCanonicalName()) &&
stackTraceElement.getMethodName().equals("fillInStackTrace")) {
break;
}
}
copy.setStackTrace(Arrays.copyOfRange(stackTrace, remove, stackTrace.length));
return copy;
}
return e;
}

private final boolean afterSuper;

protected AsyncRuntimeException() {
super();
afterSuper = true;
}

protected AsyncRuntimeException(final @Nullable String message) {
super(message, null);
afterSuper = true;
}

protected AsyncRuntimeException(final @Nullable String message, final @Nullable Throwable cause) {
super(message, cause);
afterSuper = true;
}

protected AsyncRuntimeException(final @Nullable Throwable cause) {
super(cause);
afterSuper = true;
}

protected AsyncRuntimeException(final @NotNull AsyncRuntimeException e) {
super(e.getMessage(), e.getCause());
super.fillInStackTrace();
}

@Override
public synchronized @NotNull Throwable fillInStackTrace() {
if (afterSuper) {
return super.fillInStackTrace();
}
return this;
}

protected abstract @NotNull AsyncRuntimeException copy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,13 @@ public ConnectionClosedException(final @NotNull String message) {
public ConnectionClosedException(final @NotNull Throwable cause) {
super(cause);
}

private ConnectionClosedException(final @NotNull ConnectionClosedException e) {
super(e);
}

@Override
protected @NotNull ConnectionClosedException copy() {
return new ConnectionClosedException(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,13 @@ public ConnectionFailedException(final @NotNull String message) {
public ConnectionFailedException(final @NotNull Throwable cause) {
super(cause);
}

private ConnectionFailedException(final @NotNull ConnectionFailedException e) {
super(e);
}

@Override
protected @NotNull ConnectionFailedException copy() {
return new ConnectionFailedException(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,13 @@ public class MqttClientStateException extends AsyncRuntimeException {
public MqttClientStateException(final @NotNull String message) {
super(message);
}

private MqttClientStateException(final @NotNull MqttClientStateException e) {
super(e);
}

@Override
protected @NotNull MqttClientStateException copy() {
return new MqttClientStateException(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,13 @@ public class MqttDecodeException extends AsyncRuntimeException {
public MqttDecodeException(final @NotNull String message) {
super(message);
}

private MqttDecodeException(final @NotNull MqttDecodeException e) {
super(e);
}

@Override
protected @NotNull MqttDecodeException copy() {
return new MqttDecodeException(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,13 @@ public class MqttEncodeException extends AsyncRuntimeException {
public MqttEncodeException(final @NotNull String message) {
super(message);
}

private MqttEncodeException(final @NotNull MqttEncodeException e) {
super(e);
}

@Override
protected @NotNull MqttEncodeException copy() {
return new MqttEncodeException(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,13 @@ public class MqttSessionExpiredException extends AsyncRuntimeException {
public MqttSessionExpiredException(final @NotNull String message, final @NotNull Throwable cause) {
super(message, cause);
}

private MqttSessionExpiredException(final @NotNull MqttSessionExpiredException e) {
super(e);
}

@Override
protected @NotNull MqttSessionExpiredException copy() {
return new MqttSessionExpiredException(this);
}
}
Loading