Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improve netty usage #404

Merged
merged 4 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion providers/flagd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<!-- we only support unix sockets on linux, via epoll native lib -->
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.1.95.Final</version>
<version>4.1.96.Final</version>
<!-- TODO: with 5+ (still alpha), arm is support and we should package multiple versions -->
<classifier>linux-x86_64</classifier>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
Expand Down Expand Up @@ -192,6 +193,11 @@ private static void busyWaitAndCheck(final Long deadline, final AtomicBoolean ch
private static ManagedChannel nettyChannel(final FlagdOptions options) {
// we have a socket path specified, build a channel with a unix socket
if (options.getSocketPath() != null) {
// check epoll availability
if (!Epoll.isAvailable()) {
throw new IllegalStateException("unix socket cannot be used", Epoll.unavailabilityCause());
}

return NettyChannelBuilder
.forAddress(new DomainSocketAddress(options.getSocketPath()))
.eventLoopGroup(new EpollEventLoopGroup())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedConstruction;
Expand Down Expand Up @@ -124,8 +127,9 @@ void initialization_fail_with_timeout() throws Exception {
}

@Test
toddbaert marked this conversation as resolved.
Show resolved Hide resolved
void path_arg_should_build_domain_socket_with_correct_path() {
final String path = "/some/path";
void host_and_port_arg_should_build_tcp_socket() {
final String host = "host.com";
final int port = 1234;

ServiceGrpc.ServiceBlockingStub mockBlockingStub = mock(ServiceGrpc.ServiceBlockingStub.class);
ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class);
Expand All @@ -139,31 +143,25 @@ void path_arg_should_build_domain_socket_with_correct_path() {

try (MockedStatic<NettyChannelBuilder> mockStaticChannelBuilder = mockStatic(NettyChannelBuilder.class)) {

try (MockedConstruction<EpollEventLoopGroup> mockEpollEventLoopGroup = mockConstruction(
EpollEventLoopGroup.class,
(mock, context) -> {
})) {
when(NettyChannelBuilder.forAddress(any(DomainSocketAddress.class))).thenReturn(mockChannelBuilder);
mockStaticChannelBuilder.when(() -> NettyChannelBuilder
.forAddress(anyString(), anyInt())).thenReturn(mockChannelBuilder);

new GrpcConnector(FlagdOptions.builder().socketPath(path).build(), null, null);
final FlagdOptions flagdOptions = FlagdOptions.builder().host(host).port(port).tls(false).build();
new GrpcConnector(flagdOptions, null, null);

// verify path matches
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
.forAddress(argThat((DomainSocketAddress d) -> {
assertEquals(d.path(), path); // path should match
return true;
})), times(1));
}
// verify host/port matches
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
.forAddress(host, port), times(1));
}
}
}

@Test
void no_args_socket_env_should_build_domain_socket_with_correct_path() throws Exception {
final String path = "/some/other/path";

new EnvironmentVariables("FLAGD_SOCKET_PATH", path).execute(() -> {
void no_args_host_and_port_env_set_should_build_tcp_socket() throws Exception {
final String host = "server.com";
final int port = 4321;

new EnvironmentVariables("FLAGD_HOST", host, "FLAGD_PORT", String.valueOf(port)).execute(() -> {
ServiceGrpc.ServiceBlockingStub mockBlockingStub = mock(ServiceGrpc.ServiceBlockingStub.class);
ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class);
NettyChannelBuilder mockChannelBuilder = getMockChannelBuilderSocket();
Expand All @@ -177,30 +175,27 @@ void no_args_socket_env_should_build_domain_socket_with_correct_path() throws Ex
try (MockedStatic<NettyChannelBuilder> mockStaticChannelBuilder = mockStatic(
NettyChannelBuilder.class)) {

try (MockedConstruction<EpollEventLoopGroup> mockEpollEventLoopGroup = mockConstruction(
EpollEventLoopGroup.class,
(mock, context) -> {
})) {
mockStaticChannelBuilder.when(() -> NettyChannelBuilder
.forAddress(any(DomainSocketAddress.class))).thenReturn(mockChannelBuilder);
mockStaticChannelBuilder.when(() -> NettyChannelBuilder
.forAddress(anyString(), anyInt())).thenReturn(mockChannelBuilder);

new GrpcConnector(FlagdOptions.builder().build(), null, null);
new GrpcConnector(FlagdOptions.builder().build(), null, null);

//verify path matches & called times(= 1 as we rely on reusable channel)
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
.forAddress(argThat((DomainSocketAddress d) -> {
return d.path() == path;
})), times(1));
}
// verify host/port matches & called times(= 1 as we rely on reusable channel)
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder.
forAddress(host, port), times(1));
}
}
});
}


/**
* OS Specific test - This test is valid only on Linux system as it rely on epoll availability
* */
@Test
void host_and_port_arg_should_build_tcp_socket() {
final String host = "host.com";
final int port = 1234;
@EnabledOnOs(OS.LINUX)
void path_arg_should_build_domain_socket_with_correct_path() {
final String path = "/some/path";

ServiceGrpc.ServiceBlockingStub mockBlockingStub = mock(ServiceGrpc.ServiceBlockingStub.class);
ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class);
Expand All @@ -214,25 +209,35 @@ void host_and_port_arg_should_build_tcp_socket() {

try (MockedStatic<NettyChannelBuilder> mockStaticChannelBuilder = mockStatic(NettyChannelBuilder.class)) {

mockStaticChannelBuilder.when(() -> NettyChannelBuilder
.forAddress(anyString(), anyInt())).thenReturn(mockChannelBuilder);
try (MockedConstruction<EpollEventLoopGroup> mockEpollEventLoopGroup = mockConstruction(
EpollEventLoopGroup.class,
(mock, context) -> {
})) {
when(NettyChannelBuilder.forAddress(any(DomainSocketAddress.class))).thenReturn(mockChannelBuilder);

final FlagdOptions flagdOptions = FlagdOptions.builder().host(host).port(port).tls(false).build();
new GrpcConnector(flagdOptions, null, null);
new GrpcConnector(FlagdOptions.builder().socketPath(path).build(), null, null);

// verify host/port matches
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
.forAddress(host, port), times(1));
// verify path matches
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
.forAddress(argThat((DomainSocketAddress d) -> {
assertEquals(d.path(), path); // path should match
return true;
})), times(1));
}
}
}
}

/**
* OS Specific test - This test is valid only on Linux system as it rely on epoll availability
* */
@Test
void no_args_host_and_port_env_set_should_build_tcp_socket() throws Exception {
final String host = "server.com";
final int port = 4321;
@EnabledOnOs(OS.LINUX)
void no_args_socket_env_should_build_domain_socket_with_correct_path() throws Exception {
final String path = "/some/other/path";

new EnvironmentVariables("FLAGD_SOCKET_PATH", path).execute(() -> {

new EnvironmentVariables("FLAGD_HOST", host, "FLAGD_PORT", String.valueOf(port)).execute(() -> {
ServiceGrpc.ServiceBlockingStub mockBlockingStub = mock(ServiceGrpc.ServiceBlockingStub.class);
ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class);
NettyChannelBuilder mockChannelBuilder = getMockChannelBuilderSocket();
Expand All @@ -246,14 +251,21 @@ void no_args_host_and_port_env_set_should_build_tcp_socket() throws Exception {
try (MockedStatic<NettyChannelBuilder> mockStaticChannelBuilder = mockStatic(
NettyChannelBuilder.class)) {

mockStaticChannelBuilder.when(() -> NettyChannelBuilder
.forAddress(anyString(), anyInt())).thenReturn(mockChannelBuilder);
try (MockedConstruction<EpollEventLoopGroup> mockEpollEventLoopGroup = mockConstruction(
EpollEventLoopGroup.class,
(mock, context) -> {
})) {
mockStaticChannelBuilder.when(() -> NettyChannelBuilder
.forAddress(any(DomainSocketAddress.class))).thenReturn(mockChannelBuilder);

new GrpcConnector(FlagdOptions.builder().build(), null, null);
new GrpcConnector(FlagdOptions.builder().build(), null, null);

// verify host/port matches & called times(= 1 as we rely on reusable channel)
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder.
forAddress(host, port), times(1));
//verify path matches & called times(= 1 as we rely on reusable channel)
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
.forAddress(argThat((DomainSocketAddress d) -> {
return d.path() == path;
})), times(1));
}
}
}
});
Expand Down
Loading