Skip to content

Commit

Permalink
Add bind support for DefaultPooledConnectionProvider
Browse files Browse the repository at this point in the history
Related to #1531
  • Loading branch information
violetagg committed Jun 25, 2024
1 parent fe241fe commit e7a1e75
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.net.SocketAddress;
import java.time.Clock;
import java.time.Duration;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
Expand All @@ -27,6 +28,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoop;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.AttributeKey;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -92,7 +94,7 @@ protected InstrumentedPool<PooledConnection> createPool(
TransportConfig config,
PoolFactory<PooledConnection> poolFactory,
SocketAddress remoteAddress,
AddressResolverGroup<?> resolverGroup) {
@Nullable AddressResolverGroup<?> resolverGroup) {
return new PooledConnectionAllocator(config, poolFactory, remoteAddress, resolverGroup).pool;
}

Expand All @@ -102,7 +104,7 @@ protected InstrumentedPool<PooledConnection> createPool(
TransportConfig config,
PoolFactory<PooledConnection> poolFactory,
SocketAddress remoteAddress,
AddressResolverGroup<?> resolverGroup) {
@Nullable AddressResolverGroup<?> resolverGroup) {
return new PooledConnectionAllocator(id, name, config, poolFactory, remoteAddress, resolverGroup).pool;
}

Expand Down Expand Up @@ -511,7 +513,7 @@ static final class PooledConnectionAllocator {
TransportConfig config,
PoolFactory<PooledConnection> provider,
SocketAddress remoteAddress,
AddressResolverGroup<?> resolver) {
@Nullable AddressResolverGroup<?> resolver) {
this(null, null, config, provider, remoteAddress, resolver);
}

Expand All @@ -521,7 +523,7 @@ static final class PooledConnectionAllocator {
TransportConfig config,
PoolFactory<PooledConnection> provider,
SocketAddress remoteAddress,
AddressResolverGroup<?> resolver) {
@Nullable AddressResolverGroup<?> resolver) {
this.config = config;
this.remoteAddress = remoteAddress;
this.resolver = resolver;
Expand All @@ -536,12 +538,20 @@ Publisher<PooledConnection> connectChannel() {
PooledConnectionInitializer initializer = new PooledConnectionInitializer(sink);
EventLoop callerEventLoop = sink.contextView().hasKey(CONTEXT_CALLER_EVENTLOOP) ?
sink.contextView().get(CONTEXT_CALLER_EVENTLOOP) : null;
if (callerEventLoop != null) {
TransportConnector.connect(config, remoteAddress, resolver, initializer, callerEventLoop, sink.contextView())
.subscribe(initializer);
if (resolver != null) {
if (callerEventLoop != null) {
TransportConnector.connect(config, remoteAddress, resolver, initializer, callerEventLoop, sink.contextView())
.subscribe(initializer);
}
else {
TransportConnector.connect(config, remoteAddress, resolver, initializer, sink.contextView()).subscribe(initializer);
}
}
else {
TransportConnector.connect(config, remoteAddress, resolver, initializer, sink.contextView()).subscribe(initializer);
Objects.requireNonNull(config.bindAddress(), "bindAddress");
SocketAddress local = Objects.requireNonNull(config.bindAddress().get(), "Bind Address supplier returned null");
TransportConnector.bind(config, initializer, local, remoteAddress instanceof DomainSocketAddress)
.subscribe(initializer);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public final Mono<? extends Connection> acquire(
Objects.requireNonNull(config, "config");
Objects.requireNonNull(connectionObserver, "connectionObserver");
Objects.requireNonNull(remote, "remoteAddress");
Objects.requireNonNull(resolverGroup, "resolverGroup");
return Mono.create(sink -> {
SocketAddress remoteAddress = Objects.requireNonNull(remote.get(), "Remote Address supplier returned null");
PoolKey holder = new PoolKey(remoteAddress, config.channelHash());
Expand Down Expand Up @@ -307,14 +306,14 @@ protected abstract InstrumentedPool<T> createPool(
TransportConfig config,
PoolFactory<T> poolFactory,
SocketAddress remoteAddress,
AddressResolverGroup<?> resolverGroup);
@Nullable AddressResolverGroup<?> resolverGroup);

protected InstrumentedPool<T> createPool(
String id,
TransportConfig config,
PoolFactory<T> poolFactory,
SocketAddress remoteAddress,
AddressResolverGroup<?> resolverGroup) {
@Nullable AddressResolverGroup<?> resolverGroup) {
return createPool(config, poolFactory, remoteAddress, resolverGroup);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ protected InstrumentedPool<Connection> createPool(
TransportConfig config,
PooledConnectionProvider.PoolFactory<Connection> poolFactory,
SocketAddress remoteAddress,
AddressResolverGroup<?> resolverGroup) {
@Nullable AddressResolverGroup<?> resolverGroup) {
return new PooledConnectionAllocator(parent, config, poolFactory, remoteAddress, resolverGroup).pool;
}

Expand All @@ -145,7 +145,7 @@ protected InstrumentedPool<Connection> createPool(
TransportConfig config,
PooledConnectionProvider.PoolFactory<Connection> poolFactory,
SocketAddress remoteAddress,
AddressResolverGroup<?> resolverGroup) {
@Nullable AddressResolverGroup<?> resolverGroup) {
return new PooledConnectionAllocator(id, name(), parent, config, poolFactory, remoteAddress, resolverGroup).pool;
}

Expand Down Expand Up @@ -549,7 +549,7 @@ static final class PooledConnectionAllocator {
TransportConfig config,
PoolFactory<Connection> poolFactory,
SocketAddress remoteAddress,
AddressResolverGroup<?> resolver) {
@Nullable AddressResolverGroup<?> resolver) {
this(null, null, parent, config, poolFactory, remoteAddress, resolver);
}

Expand All @@ -560,7 +560,7 @@ static final class PooledConnectionAllocator {
TransportConfig config,
PoolFactory<Connection> poolFactory,
SocketAddress remoteAddress,
AddressResolverGroup<?> resolver) {
@Nullable AddressResolverGroup<?> resolver) {
this.parent = parent;
this.config = (HttpClientConfig) config;
this.remoteAddress = remoteAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ else if (_config.checkProtocol(HttpClientConfig.h3)) {
.then(_config.connectionObserver())
.then(new HttpIOHandlerObserver(sink, handler));

AddressResolverGroup<?> resolver = _config.resolverInternal();
AddressResolverGroup<?> resolver =
!_config.checkProtocol(HttpClientConfig.h3) ? _config.resolverInternal() : null;

_config.httpConnectionProvider()
.acquire(_config, observer, handler, resolver)
Expand Down

0 comments on commit e7a1e75

Please sign in to comment.