Skip to content

Commit

Permalink
chore: improve netty usage (#404)
Browse files Browse the repository at this point in the history
Signed-off-by: Kavindu Dodanduwa <kavindudodanduwa@gmail.com>
  • Loading branch information
Kavindu-Dodan authored Aug 21, 2023
1 parent 2c1060c commit 66482a0
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 53 deletions.
2 changes: 1 addition & 1 deletion providers/flagd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<!-- we only support unix sockets on linux, via epoll native lib -->
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.1.95.Final</version>
<version>4.1.96.Final</version>
<!-- TODO: with 5+ (still alpha), arm is support and we should package multiple versions -->
<classifier>linux-x86_64</classifier>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
Expand Down Expand Up @@ -192,6 +193,11 @@ private static void busyWaitAndCheck(final Long deadline, final AtomicBoolean ch
private static ManagedChannel nettyChannel(final FlagdOptions options) {
// we have a socket path specified, build a channel with a unix socket
if (options.getSocketPath() != null) {
// check epoll availability
if (!Epoll.isAvailable()) {
throw new IllegalStateException("unix socket cannot be used", Epoll.unavailabilityCause());
}

return NettyChannelBuilder
.forAddress(new DomainSocketAddress(options.getSocketPath()))
.eventLoopGroup(new EpollEventLoopGroup())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedConstruction;
Expand Down Expand Up @@ -124,8 +127,9 @@ void initialization_fail_with_timeout() throws Exception {
}

@Test
void path_arg_should_build_domain_socket_with_correct_path() {
final String path = "/some/path";
void host_and_port_arg_should_build_tcp_socket() {
final String host = "host.com";
final int port = 1234;

ServiceGrpc.ServiceBlockingStub mockBlockingStub = mock(ServiceGrpc.ServiceBlockingStub.class);
ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class);
Expand All @@ -139,31 +143,25 @@ void path_arg_should_build_domain_socket_with_correct_path() {

try (MockedStatic<NettyChannelBuilder> mockStaticChannelBuilder = mockStatic(NettyChannelBuilder.class)) {

try (MockedConstruction<EpollEventLoopGroup> mockEpollEventLoopGroup = mockConstruction(
EpollEventLoopGroup.class,
(mock, context) -> {
})) {
when(NettyChannelBuilder.forAddress(any(DomainSocketAddress.class))).thenReturn(mockChannelBuilder);
mockStaticChannelBuilder.when(() -> NettyChannelBuilder
.forAddress(anyString(), anyInt())).thenReturn(mockChannelBuilder);

new GrpcConnector(FlagdOptions.builder().socketPath(path).build(), null, null);
final FlagdOptions flagdOptions = FlagdOptions.builder().host(host).port(port).tls(false).build();
new GrpcConnector(flagdOptions, null, null);

// verify path matches
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
.forAddress(argThat((DomainSocketAddress d) -> {
assertEquals(d.path(), path); // path should match
return true;
})), times(1));
}
// verify host/port matches
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
.forAddress(host, port), times(1));
}
}
}

@Test
void no_args_socket_env_should_build_domain_socket_with_correct_path() throws Exception {
final String path = "/some/other/path";

new EnvironmentVariables("FLAGD_SOCKET_PATH", path).execute(() -> {
void no_args_host_and_port_env_set_should_build_tcp_socket() throws Exception {
final String host = "server.com";
final int port = 4321;

new EnvironmentVariables("FLAGD_HOST", host, "FLAGD_PORT", String.valueOf(port)).execute(() -> {
ServiceGrpc.ServiceBlockingStub mockBlockingStub = mock(ServiceGrpc.ServiceBlockingStub.class);
ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class);
NettyChannelBuilder mockChannelBuilder = getMockChannelBuilderSocket();
Expand All @@ -177,30 +175,27 @@ void no_args_socket_env_should_build_domain_socket_with_correct_path() throws Ex
try (MockedStatic<NettyChannelBuilder> mockStaticChannelBuilder = mockStatic(
NettyChannelBuilder.class)) {

try (MockedConstruction<EpollEventLoopGroup> mockEpollEventLoopGroup = mockConstruction(
EpollEventLoopGroup.class,
(mock, context) -> {
})) {
mockStaticChannelBuilder.when(() -> NettyChannelBuilder
.forAddress(any(DomainSocketAddress.class))).thenReturn(mockChannelBuilder);
mockStaticChannelBuilder.when(() -> NettyChannelBuilder
.forAddress(anyString(), anyInt())).thenReturn(mockChannelBuilder);

new GrpcConnector(FlagdOptions.builder().build(), null, null);
new GrpcConnector(FlagdOptions.builder().build(), null, null);

//verify path matches & called times(= 1 as we rely on reusable channel)
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
.forAddress(argThat((DomainSocketAddress d) -> {
return d.path() == path;
})), times(1));
}
// verify host/port matches & called times(= 1 as we rely on reusable channel)
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder.
forAddress(host, port), times(1));
}
}
});
}


/**
* OS Specific test - This test is valid only on Linux system as it rely on epoll availability
* */
@Test
void host_and_port_arg_should_build_tcp_socket() {
final String host = "host.com";
final int port = 1234;
@EnabledOnOs(OS.LINUX)
void path_arg_should_build_domain_socket_with_correct_path() {
final String path = "/some/path";

ServiceGrpc.ServiceBlockingStub mockBlockingStub = mock(ServiceGrpc.ServiceBlockingStub.class);
ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class);
Expand All @@ -214,25 +209,35 @@ void host_and_port_arg_should_build_tcp_socket() {

try (MockedStatic<NettyChannelBuilder> mockStaticChannelBuilder = mockStatic(NettyChannelBuilder.class)) {

mockStaticChannelBuilder.when(() -> NettyChannelBuilder
.forAddress(anyString(), anyInt())).thenReturn(mockChannelBuilder);
try (MockedConstruction<EpollEventLoopGroup> mockEpollEventLoopGroup = mockConstruction(
EpollEventLoopGroup.class,
(mock, context) -> {
})) {
when(NettyChannelBuilder.forAddress(any(DomainSocketAddress.class))).thenReturn(mockChannelBuilder);

final FlagdOptions flagdOptions = FlagdOptions.builder().host(host).port(port).tls(false).build();
new GrpcConnector(flagdOptions, null, null);
new GrpcConnector(FlagdOptions.builder().socketPath(path).build(), null, null);

// verify host/port matches
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
.forAddress(host, port), times(1));
// verify path matches
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
.forAddress(argThat((DomainSocketAddress d) -> {
assertEquals(d.path(), path); // path should match
return true;
})), times(1));
}
}
}
}

/**
* OS Specific test - This test is valid only on Linux system as it rely on epoll availability
* */
@Test
void no_args_host_and_port_env_set_should_build_tcp_socket() throws Exception {
final String host = "server.com";
final int port = 4321;
@EnabledOnOs(OS.LINUX)
void no_args_socket_env_should_build_domain_socket_with_correct_path() throws Exception {
final String path = "/some/other/path";

new EnvironmentVariables("FLAGD_SOCKET_PATH", path).execute(() -> {

new EnvironmentVariables("FLAGD_HOST", host, "FLAGD_PORT", String.valueOf(port)).execute(() -> {
ServiceGrpc.ServiceBlockingStub mockBlockingStub = mock(ServiceGrpc.ServiceBlockingStub.class);
ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class);
NettyChannelBuilder mockChannelBuilder = getMockChannelBuilderSocket();
Expand All @@ -246,14 +251,21 @@ void no_args_host_and_port_env_set_should_build_tcp_socket() throws Exception {
try (MockedStatic<NettyChannelBuilder> mockStaticChannelBuilder = mockStatic(
NettyChannelBuilder.class)) {

mockStaticChannelBuilder.when(() -> NettyChannelBuilder
.forAddress(anyString(), anyInt())).thenReturn(mockChannelBuilder);
try (MockedConstruction<EpollEventLoopGroup> mockEpollEventLoopGroup = mockConstruction(
EpollEventLoopGroup.class,
(mock, context) -> {
})) {
mockStaticChannelBuilder.when(() -> NettyChannelBuilder
.forAddress(any(DomainSocketAddress.class))).thenReturn(mockChannelBuilder);

new GrpcConnector(FlagdOptions.builder().build(), null, null);
new GrpcConnector(FlagdOptions.builder().build(), null, null);

// verify host/port matches & called times(= 1 as we rely on reusable channel)
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder.
forAddress(host, port), times(1));
//verify path matches & called times(= 1 as we rely on reusable channel)
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
.forAddress(argThat((DomainSocketAddress d) -> {
return d.path() == path;
})), times(1));
}
}
}
});
Expand Down

0 comments on commit 66482a0

Please sign in to comment.