Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Throwable usage from transport modules #30845

Merged
merged 4 commits into from
May 24, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.concurrent;

import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

/**
* A thread-safe completable context that allows listeners to be attached. This class relies on the
* {@link CompletableFuture} for the concurrency logic. However, it does not accept {@link Throwable} as
* an exceptional result. This allows attaching listeners that only handle {@link Exception}.
*
* @param <T> the result type
*/
public class CompletableContext<T> {

private final CompletableFuture<T> completableFuture = new CompletableFuture<>();

public void addListener(BiConsumer<T, ? super Exception> listener) {
BiConsumer<T, Throwable> castThrowable = (v, t) -> {
if (t == null) {
listener.accept(v, null);
} else {
assert !(t instanceof Error) : "Cannot be error";
listener.accept(v, (Exception) t);
}
};
completableFuture.whenComplete(castThrowable);
}

public boolean isDone() {
return completableFuture.isDone();
}

public boolean isCompletedExceptionally() {
return completableFuture.isCompletedExceptionally();
}

public boolean completeExceptionally(Exception ex) {
return completableFuture.completeExceptionally(ex);
}

public boolean complete(T value) {
return completableFuture.complete(value);
}
}
2 changes: 2 additions & 0 deletions libs/elasticsearch-nio/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ publishing {
}

dependencies {
compile "org.elasticsearch:elasticsearch-core:${version}"

testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
testCompile "junit:junit:${versions.junit}"
testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public abstract class BytesWriteHandler implements ReadWriteHandler {

private static final List<FlushOperation> EMPTY_LIST = Collections.emptyList();

public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Throwable> listener) {
public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Exception> listener) {
assert message instanceof ByteBuffer[] : "This channel only supports messages that are of type: " + ByteBuffer[].class
+ ". Found type: " + message.getClass() + ".";
return new FlushReadyWrite(context, (ByteBuffer[]) message, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

package org.elasticsearch.nio;

import org.elasticsearch.common.concurrent.CompletableContext;

import java.io.IOException;
import java.nio.channels.NetworkChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

Expand All @@ -37,7 +38,7 @@ public abstract class ChannelContext<S extends SelectableChannel & NetworkChanne

protected final S rawChannel;
private final Consumer<Exception> exceptionHandler;
private final CompletableFuture<Void> closeContext = new CompletableFuture<>();
private final CompletableContext<Void> closeContext = new CompletableContext<>();
private volatile SelectionKey selectionKey;

ChannelContext(S rawChannel, Consumer<Exception> exceptionHandler) {
Expand Down Expand Up @@ -81,8 +82,8 @@ public void closeFromSelector() throws IOException {
*
* @param listener to be called
*/
public void addCloseListener(BiConsumer<Void, Throwable> listener) {
closeContext.whenComplete(listener);
public void addCloseListener(BiConsumer<Void, Exception> listener) {
closeContext.addListener(listener);
}

public boolean isOpen() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@

public class FlushOperation {

private final BiConsumer<Void, Throwable> listener;
private final BiConsumer<Void, Exception> listener;
private final ByteBuffer[] buffers;
private final int[] offsets;
private final int length;
private int internalIndex;

public FlushOperation(ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener) {
public FlushOperation(ByteBuffer[] buffers, BiConsumer<Void, Exception> listener) {
this.listener = listener;
this.buffers = buffers;
this.offsets = new int[buffers.length];
Expand All @@ -44,7 +44,7 @@ public FlushOperation(ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener
length = offset;
}

public BiConsumer<Void, Throwable> getListener() {
public BiConsumer<Void, Exception> getListener() {
return listener;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class FlushReadyWrite extends FlushOperation implements WriteOperation {
private final SocketChannelContext channelContext;
private final ByteBuffer[] buffers;

FlushReadyWrite(SocketChannelContext channelContext, ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener) {
FlushReadyWrite(SocketChannelContext channelContext, ByteBuffer[] buffers, BiConsumer<Void, Exception> listener) {
super(buffers, listener);
this.channelContext = channelContext;
this.buffers = buffers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public InetSocketAddress getLocalAddress() {
*
* @param listener to be called at close
*/
public void addCloseListener(BiConsumer<Void, Throwable> listener) {
public void addCloseListener(BiConsumer<Void, Exception> listener) {
getContext().addCloseListener(listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}

public void addConnectListener(BiConsumer<Void, Throwable> listener) {
public void addConnectListener(BiConsumer<Void, Exception> listener) {
context.addConnectListener(listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface ReadWriteHandler {
* @param listener the listener to be called when the message is sent
* @return the write operation to be queued
*/
WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Throwable> listener);
WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Exception> listener);

/**
* This method is called on the event loop thread. It should serialize a write operation object to bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.nio;

import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.nio.utils.ExceptionsHelper;

import java.io.IOException;
Expand All @@ -27,7 +28,6 @@
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand All @@ -48,7 +48,7 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
protected final AtomicBoolean isClosing = new AtomicBoolean(false);
private final ReadWriteHandler readWriteHandler;
private final SocketSelector selector;
private final CompletableFuture<Void> connectContext = new CompletableFuture<>();
private final CompletableContext<Void> connectContext = new CompletableContext<>();
private final LinkedList<FlushOperation> pendingFlushes = new LinkedList<>();
private boolean ioException;
private boolean peerClosed;
Expand All @@ -73,8 +73,8 @@ public NioSocketChannel getChannel() {
return channel;
}

public void addConnectListener(BiConsumer<Void, Throwable> listener) {
connectContext.whenComplete(listener);
public void addConnectListener(BiConsumer<Void, Exception> listener) {
connectContext.addListener(listener);
}

public boolean isConnectComplete() {
Expand Down Expand Up @@ -121,7 +121,7 @@ public boolean connect() throws IOException {
return isConnected;
}

public void sendMessage(Object message, BiConsumer<Void, Throwable> listener) {
public void sendMessage(Object message, BiConsumer<Void, Exception> listener) {
if (isClosing.get()) {
listener.accept(null, new ClosedChannelException());
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void queueWriteInChannelBuffer(WriteOperation writeOperation) {
* @param listener to be executed
* @param value to provide to listener
*/
public <V> void executeListener(BiConsumer<V, Throwable> listener, V value) {
public <V> void executeListener(BiConsumer<V, Exception> listener, V value) {
assertOnSelectorThread();
try {
listener.accept(value, null);
Expand All @@ -154,7 +154,7 @@ public <V> void executeListener(BiConsumer<V, Throwable> listener, V value) {
* @param listener to be executed
* @param exception to provide to listener
*/
public <V> void executeFailedListener(BiConsumer<V, Throwable> listener, Exception exception) {
public <V> void executeFailedListener(BiConsumer<V, Exception> listener, Exception exception) {
assertOnSelectorThread();
try {
listener.accept(null, exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*/
public interface WriteOperation {

BiConsumer<Void, Throwable> getListener();
BiConsumer<Void, Exception> getListener();

SocketChannelContext getChannel();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class BytesChannelContextTests extends ESTestCase {
private BytesChannelContext context;
private InboundChannelBuffer channelBuffer;
private SocketSelector selector;
private BiConsumer<Void, Throwable> listener;
private BiConsumer<Void, Exception> listener;
private int messageLength;

@Before
Expand Down Expand Up @@ -191,7 +191,7 @@ public void testPartialFlush() throws IOException {
public void testMultipleWritesPartialFlushes() throws IOException {
assertFalse(context.readyForFlush());

BiConsumer<Void, Throwable> listener2 = mock(BiConsumer.class);
BiConsumer<Void, Exception> listener2 = mock(BiConsumer.class);
FlushReadyWrite flushOperation1 = mock(FlushReadyWrite.class);
FlushReadyWrite flushOperation2 = mock(FlushReadyWrite.class);
when(flushOperation1.getBuffersToWrite()).thenReturn(new ByteBuffer[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testCloseException() throws IOException {
if (t == null) {
throw new AssertionError("Close should not fail");
} else {
exception.set((Exception) t);
exception.set(t);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

public class FlushOperationTests extends ESTestCase {

private BiConsumer<Void, Throwable> listener;
private BiConsumer<Void, Exception> listener;

@Before
@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class SocketChannelContextTests extends ESTestCase {
private TestSocketChannelContext context;
private Consumer<Exception> exceptionHandler;
private NioSocketChannel channel;
private BiConsumer<Void, Throwable> listener;
private BiConsumer<Void, Exception> listener;
private SocketSelector selector;
private ReadWriteHandler readWriteHandler;

Expand Down Expand Up @@ -206,7 +206,7 @@ public void testFlushOpsClearedOnClose() throws Exception {

ByteBuffer[] buffer = {ByteBuffer.allocate(10)};
WriteOperation writeOperation = mock(WriteOperation.class);
BiConsumer<Void, Throwable> listener2 = mock(BiConsumer.class);
BiConsumer<Void, Exception> listener2 = mock(BiConsumer.class);
when(readWriteHandler.writeToBytes(writeOperation)).thenReturn(Arrays.asList(new FlushOperation(buffer, listener),
new FlushOperation(buffer, listener2)));
context.queueWriteOperation(writeOperation);
Expand All @@ -232,7 +232,7 @@ public void testWillPollForFlushOpsToClose() throws Exception {


ByteBuffer[] buffer = {ByteBuffer.allocate(10)};
BiConsumer<Void, Throwable> listener2 = mock(BiConsumer.class);
BiConsumer<Void, Exception> listener2 = mock(BiConsumer.class);

assertFalse(context.readyForFlush());
when(channel.isOpen()).thenReturn(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class SocketSelectorTests extends ESTestCase {
private NioSocketChannel channel;
private TestSelectionKey selectionKey;
private SocketChannelContext channelContext;
private BiConsumer<Void, Throwable> listener;
private BiConsumer<Void, Exception> listener;
private ByteBuffer[] buffers = {ByteBuffer.allocate(1)};
private Selector rawSelector;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,21 @@
package org.elasticsearch.transport.netty4;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TransportException;

import java.net.InetSocketAddress;
import java.nio.channels.ClosedSelectorException;
import java.util.concurrent.CompletableFuture;

public class NettyTcpChannel implements TcpChannel {

private final Channel channel;
private final String profile;
private final CompletableFuture<Void> closeContext = new CompletableFuture<>();
private final CompletableContext<Void> closeContext = new CompletableContext<>();

NettyTcpChannel(Channel channel, String profile) {
this.channel = channel;
Expand All @@ -51,9 +46,9 @@ public class NettyTcpChannel implements TcpChannel {
Throwable cause = f.cause();
if (cause instanceof Error) {
Netty4Utils.maybeDie(cause);
closeContext.completeExceptionally(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally(cause);
closeContext.completeExceptionally((Exception) cause);
}
}
});
Expand All @@ -71,7 +66,7 @@ public String getProfile() {

@Override
public void addCloseListener(ActionListener<Void> listener) {
closeContext.whenComplete(ActionListener.toBiConsumer(listener));
closeContext.addListener(ActionListener.toBiConsumer(listener));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public int consumeReads(InboundChannelBuffer channelBuffer) throws IOException {
}

@Override
public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Throwable> listener) {
public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Exception> listener) {
assert message instanceof NioHttpResponse : "This channel only supports messages that are of type: "
+ NioHttpResponse.class + ". Found type: " + message.getClass() + ".";
return new HttpWriteOperation(context, (NioHttpResponse) message, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ public class HttpWriteOperation implements WriteOperation {

private final SocketChannelContext channelContext;
private final NioHttpResponse response;
private final BiConsumer<Void, Throwable> listener;
private final BiConsumer<Void, Exception> listener;

HttpWriteOperation(SocketChannelContext channelContext, NioHttpResponse response, BiConsumer<Void, Throwable> listener) {
HttpWriteOperation(SocketChannelContext channelContext, NioHttpResponse response, BiConsumer<Void, Exception> listener) {
this.channelContext = channelContext;
this.response = response;
this.listener = listener;
}

@Override
public BiConsumer<Void, Throwable> getListener() {
public BiConsumer<Void, Exception> getListener() {
return listener;
}

Expand Down
Loading