Skip to content

Commit

Permalink
feat: added custom grpc resolver
Browse files Browse the repository at this point in the history
added custom gRPC resolver to support envoy proxy

Signed-off-by: Pradeep Mishra <pradeepbbl@gmail.com>
  • Loading branch information
pradeepbbl committed Oct 8, 2024
1 parent 8c6e0ad commit fb0f789
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public final class Config {
static final String OFFLINE_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH";
static final String KEEP_ALIVE_MS_ENV_VAR_NAME_OLD = "FLAGD_KEEP_ALIVE_TIME";
static final String KEEP_ALIVE_MS_ENV_VAR_NAME = "FLAGD_KEEP_ALIVE_TIME_MS";
static final String GRPC_TARGET_ENV_VAR_NAME = "FLAGD_GRPC_TARGET";

static final String RESOLVER_RPC = "rpc";
static final String RESOLVER_IN_PROCESS = "in-process";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,18 @@ public class FlagdOptions {
@Builder.Default
private String offlineFlagSourcePath = fallBackToEnvOrDefault(Config.OFFLINE_SOURCE_PATH, null);


/**
* gRPC custom target string.
* <p>
* Setting this will allow user to use custom gRPC name resolver at present
* we are supporting all core resolver along with a custom resolver for envoy proxy
* resolution. For more visit (https://grpc.io/docs/guides/custom-name-resolution/)
*/
@Builder.Default
private String targetUri = fallBackToEnvOrDefault(Config.GRPC_TARGET_ENV_VAR_NAME, null);


/**
* Function providing an EvaluationContext to mix into every evaluations.
* The sync-metadata response
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package dev.openfeature.contrib.providers.flagd.resolver.common;

import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.nameresolvers.EnvoyResolverProvider;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.ManagedChannel;
import io.grpc.NameResolverRegistry;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.channel.epoll.Epoll;
Expand All @@ -13,6 +15,8 @@

import javax.net.ssl.SSLException;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -50,9 +54,21 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {

// build a TCP socket
try {
// Register custom resolver
if (isEnvoyTarget(options.getTargetUri())) {
NameResolverRegistry.getDefaultRegistry().register(new EnvoyResolverProvider());
}

// default to current `dns` resolution i.e. <host>:<port>, if valid / supported
// target string use the user provided target uri.
final String defaultTarget = String.format("%s:%s", options.getHost(), options.getPort());
final String targetUri = isValidTargetUri(options.getTargetUri()) ? options.getTargetUri() :
defaultTarget;

final NettyChannelBuilder builder = NettyChannelBuilder
.forAddress(options.getHost(), options.getPort())
.forTarget(targetUri)
.keepAliveTime(keepAliveMs, TimeUnit.MILLISECONDS);

if (options.isTls()) {
SslContextBuilder sslContext = GrpcSslContexts.forClient();

Expand All @@ -78,6 +94,46 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
SslConfigException sslConfigException = new SslConfigException("Error with SSL configuration.");
sslConfigException.initCause(ssle);
throw sslConfigException;
} catch (IllegalArgumentException argumentException) {
GenericConfigException genericConfigException = new GenericConfigException("Error with gRPC target string " +
"configuration");
genericConfigException.initCause(argumentException);
throw genericConfigException;
}
}

private static boolean isValidTargetUri(String targetUri) {
if (targetUri == null) {
return false;
}

try {
final String scheme = new URI(targetUri).getScheme();
if (scheme.equals("envoy") || scheme.equals("dns") || scheme.equals("xds") || scheme.equals("uds")) {
return true;
}
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid target string", e);
}

return false;
}

private static boolean isEnvoyTarget(String targetUri) {
if (targetUri == null) {
return false;
}

try {
final String scheme = new URI(targetUri).getScheme();
if (scheme.equals("envoy")) {
return true;
}
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid target string", e);
}

return false;

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package dev.openfeature.contrib.providers.flagd.resolver.common;

/**
* Custom exception for invalid gRPC configurations.
*/

public class GenericConfigException extends RuntimeException {
public GenericConfigException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package dev.openfeature.contrib.providers.flagd.resolver.common.nameresolvers;

import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import java.net.InetSocketAddress;
import io.grpc.Attributes;
import io.grpc.Status;
import java.net.URI;
import java.util.Collections;
import java.util.List;

/**
* Envoy NameResolver, will always override the authority with the specified authority and
* use the socketAddress to connect.
* <p>
* Custom URI Scheme:
* <p>
* envoy://[proxy-agent-host]:[proxy-agent-port]/[service-name]
* <p>
* `service-name` is used as authority instead host
*/
public class EnvoyResolver extends NameResolver {
private final URI uri;
private final String authority;
private Listener2 listener;

public EnvoyResolver(URI targetUri) {
this.uri = targetUri;
this.authority = targetUri.getPath().substring(1);
}

@Override
public String getServiceAuthority() {
return authority;
}

@Override
public void shutdown() {
}

@Override
public void start(Listener2 listener) {
this.listener = listener;
this.resolve();
}

@Override
public void refresh() {
this.resolve();
}

private void resolve() {
try {
InetSocketAddress address = new InetSocketAddress(this.uri.getHost(), this.uri.getPort());
Attributes addressGroupAttributes = Attributes.newBuilder()
.set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, this.authority)
.build();
List<EquivalentAddressGroup> equivalentAddressGroup = Collections.singletonList(
new EquivalentAddressGroup(address, addressGroupAttributes)
);
ResolutionResult resolutionResult = ResolutionResult.newBuilder()
.setAddresses(equivalentAddressGroup)
.build();
this.listener.onResult(resolutionResult);
} catch (Exception e) {
this.listener.onError(Status.UNAVAILABLE.withDescription("Unable to resolve host ").withCause(e));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package dev.openfeature.contrib.providers.flagd.resolver.common.nameresolvers;

import io.grpc.NameResolver;
import io.grpc.NameResolverProvider;
import java.net.URI;

public class EnvoyResolverProvider extends NameResolverProvider {
static final String ENVOY_SCHEME = "envoy";
static final String DEFAULT_SERVICE_NAME = "undefined";
@Override
protected boolean isAvailable() {
return true;
}

@Override
protected int priority() {
return 6;
}

@Override
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
if (!ENVOY_SCHEME.equals(targetUri.getScheme())) {
return null;
}

if (!isValidPath(targetUri.getPath()) || targetUri.getHost() == null || targetUri.getPort() == -1 ) {
throw new IllegalArgumentException("Incorrectly formatted target uri; "
+ "expected: '" + ENVOY_SCHEME + ":[//]<proxy-agent-host>:<proxy-agent-port>/<service-name>';"
+ "but was '" + targetUri + "'");
}

return new EnvoyResolver(targetUri);
}

@Override
public String getDefaultScheme() {
return ENVOY_SCHEME;
}

private static boolean isValidPath(String path) {
return !path.isEmpty() && !path.substring(1).isEmpty()
&& !path.substring(1).contains("/");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ void TestBuilderOptions() {
.openTelemetry(openTelemetry)
.customConnector(connector)
.resolverType(Resolver.IN_PROCESS)
.targetUri("dns:///localhost:8016")
.keepAlive(1000)
.build();

Expand All @@ -69,6 +70,7 @@ void TestBuilderOptions() {
assertEquals(openTelemetry, flagdOptions.getOpenTelemetry());
assertEquals(connector, flagdOptions.getCustomConnector());
assertEquals(Resolver.IN_PROCESS, flagdOptions.getResolverType());
assertEquals("dns:///localhost:8016", flagdOptions.getTargetUri());
assertEquals(1000, flagdOptions.getKeepAlive());
}

Expand Down Expand Up @@ -187,4 +189,12 @@ void testRpcProviderFromEnv_portConfigured_usesConfiguredPort() {
assertThat(flagdOptions.getPort()).isEqualTo(1534);

}

@Test
@SetEnvironmentVariable(key = GRPC_TARGET_ENV_VAR_NAME, value = "envoy://localhost:1234/foo.service")
void testTargetOverrideFromEnv() {
FlagdOptions flagdOptions = FlagdOptions.builder().build();

assertThat(flagdOptions.getTargetUri()).isEqualTo("envoy://localhost:1234/foo.service");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package dev.openfeature.contrib.providers.flagd.resolver.common.nameresolvers;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.stream.Stream;

class EnvoyResolverProviderTest {
private final EnvoyResolverProvider provider = new EnvoyResolverProvider();

@Test
void envoyProviderTestScheme() {
Assertions.assertTrue(provider.isAvailable());
Assertions.assertNotNull(provider.newNameResolver(URI.create("envoy://localhost:1234/foo.service"),
null));
Assertions.assertNull(provider.newNameResolver(URI.create("invalid-scheme://localhost:1234/foo.service"),
null));
}


@ParameterizedTest
@MethodSource("getInvalidPaths")
void invalidTargetUriTests(String mockUri) {
Exception exception = Assertions.assertThrows(IllegalArgumentException.class, () -> {
provider.newNameResolver(URI.create(mockUri), null);
});

Assertions.assertTrue(exception.toString().contains("Incorrectly formatted target uri"));
}

private static Stream<Arguments> getInvalidPaths() {
return Stream.of(
Arguments.of("envoy://localhost:1234/test.service/test"),
Arguments.of("envoy://localhost:1234/"),
Arguments.of("envoy://localhost:1234"),
Arguments.of("envoy://localhost/test.service"),
Arguments.of("envoy:///test.service")
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package dev.openfeature.contrib.providers.flagd.resolver.common.nameresolvers;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.net.URI;

class EnvoyResolverTest {
@Test
void envoyResolverTest() {
// given
EnvoyResolver envoyResolver = new EnvoyResolver(URI.create("envoy://localhost:1234/foo.service"));

// then
Assertions.assertEquals("foo.service", envoyResolver.getServiceAuthority());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ void stream_does_not_fail_with_deadline_error() throws Exception {
void host_and_port_arg_should_build_tcp_socket() {
final String host = "host.com";
final int port = 1234;
final String targetUri = String.format("%s:%s", host, port);

ServiceGrpc.ServiceBlockingStub mockBlockingStub = mock(ServiceGrpc.ServiceBlockingStub.class);
ServiceGrpc.ServiceStub mockStub = createServiceStubMock();
Expand All @@ -206,14 +207,14 @@ void host_and_port_arg_should_build_tcp_socket() {
try (MockedStatic<NettyChannelBuilder> mockStaticChannelBuilder = mockStatic(NettyChannelBuilder.class)) {

mockStaticChannelBuilder.when(() -> NettyChannelBuilder
.forAddress(anyString(), anyInt())).thenReturn(mockChannelBuilder);
.forTarget(anyString())).thenReturn(mockChannelBuilder);

final FlagdOptions flagdOptions = FlagdOptions.builder().host(host).port(port).tls(false).build();
new GrpcConnector(flagdOptions, null, null, null);

// verify host/port matches
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder
.forAddress(host, port), times(1));
.forTarget(String.format(targetUri)), times(1));
}
}
}
Expand All @@ -222,6 +223,7 @@ void host_and_port_arg_should_build_tcp_socket() {
void no_args_host_and_port_env_set_should_build_tcp_socket() throws Exception {
final String host = "server.com";
final int port = 4321;
final String targetUri = String.format("%s:%s", host, port);

new EnvironmentVariables("FLAGD_HOST", host, "FLAGD_PORT", String.valueOf(port)).execute(() -> {
ServiceGrpc.ServiceBlockingStub mockBlockingStub = mock(ServiceGrpc.ServiceBlockingStub.class);
Expand All @@ -238,12 +240,12 @@ void no_args_host_and_port_env_set_should_build_tcp_socket() throws Exception {
NettyChannelBuilder.class)) {

mockStaticChannelBuilder.when(() -> NettyChannelBuilder
.forAddress(anyString(), anyInt())).thenReturn(mockChannelBuilder);
.forTarget(anyString())).thenReturn(mockChannelBuilder);

new GrpcConnector(FlagdOptions.builder().build(), null, null, null);

// verify host/port matches & called times(= 1 as we rely on reusable channel)
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder.forAddress(host, port), times(1));
mockStaticChannelBuilder.verify(() -> NettyChannelBuilder.forTarget(targetUri), times(1));
}
}
});
Expand Down

0 comments on commit fb0f789

Please sign in to comment.