Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add upstream and downstream info in parent read callbacks in tcp too #9949

Merged
merged 5 commits into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt
TransportSocketPtr&& transport_socket, bool connected)
: ConnectionImplBase(dispatcher, next_global_id_++),
transport_socket_(std::move(transport_socket)), socket_(std::move(socket)),
filter_manager_(*this), stream_info_(dispatcher.timeSource()),
stream_info_(dispatcher.timeSource()), filter_manager_(*this),
write_buffer_(
dispatcher.getWatermarkFactory().create([this]() -> void { this->onLowWatermark(); },
[this]() -> void { this->onHighWatermark(); })),
Expand Down
2 changes: 1 addition & 1 deletion source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback

TransportSocketPtr transport_socket_;
ConnectionSocketPtr socket_;
FilterManagerImpl filter_manager_;
StreamInfo::StreamInfoImpl stream_info_;
FilterManagerImpl filter_manager_;

Buffer::OwnedImpl read_buffer_;
// This must be a WatermarkBuffer, but as it is created by a factory the ConnectionImpl only has
Expand Down
10 changes: 7 additions & 3 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,9 @@ UpstreamDrainManager& Config::drainManager() {
return upstream_drain_manager_slot_->getTyped<UpstreamDrainManager>();
}

Filter::Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager,
TimeSource& time_source)
Filter::Filter(ConfigSharedPtr config, Upstream::ClusterManager& cluster_manager, TimeSource&)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimeSource& is no longer needed in the signature then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

: config_(config), cluster_manager_(cluster_manager), downstream_callbacks_(*this),
upstream_callbacks_(new UpstreamCallbacks(this)), stream_info_(time_source) {
upstream_callbacks_(new UpstreamCallbacks(this)) {
ASSERT(config != nullptr);
}

Expand Down Expand Up @@ -292,6 +291,11 @@ void Filter::readDisableDownstream(bool disable) {
}
}

StreamInfo::StreamInfo& Filter::getStreamInfo() {
ASSERT(read_callbacks_ != nullptr);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: del as this will crash in an obvious way on the next line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.. Will remove

return read_callbacks_->connection().streamInfo();
}

void Filter::DownstreamCallbacks::onAboveWriteBufferHighWatermark() {
ASSERT(!on_high_watermark_called_);
on_high_watermark_called_ = true;
Expand Down
3 changes: 1 addition & 2 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class Filter : public Network::ReadFilter,
bool on_high_watermark_called_{false};
};

virtual StreamInfo::StreamInfo& getStreamInfo() { return stream_info_; }
virtual StreamInfo::StreamInfo& getStreamInfo();

protected:
struct DownstreamCallbacks : public Network::ConnectionCallbacks {
Expand Down Expand Up @@ -354,7 +354,6 @@ class Filter : public Network::ReadFilter,
std::shared_ptr<UpstreamCallbacks> upstream_callbacks_; // shared_ptr required for passing as a
// read filter.
std::unique_ptr<GenericUpstream> upstream_;
StreamInfo::StreamInfoImpl stream_info_;
RouteConstSharedPtr route_;
Network::TransportSocketOptionsSharedPtr transport_socket_options_;
uint32_t connect_attempts_{};
Expand Down
2 changes: 1 addition & 1 deletion test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ class HttpConnectionManagerImplTest : public testing::Test, public ConnectionMan
bool shouldNormalizePath() const override { return normalize_path_; }
bool shouldMergeSlashes() const override { return merge_slashes_; }

DangerousDeprecatedTestTime test_time_;
Envoy::Event::SimulatedTimeSystem test_time_;
NiceMock<Router::MockRouteConfigProvider> route_config_provider_;
std::shared_ptr<Router::MockConfig> route_config_{new NiceMock<Router::MockConfig>()};
NiceMock<Router::MockScopedRouteConfigProvider> scoped_route_config_provider_;
Expand Down
54 changes: 33 additions & 21 deletions test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,11 @@ class TcpProxyTest : public testing::Test {
TcpProxyTest() {
ON_CALL(*factory_context_.access_log_manager_.file_, write(_))
.WillByDefault(SaveArg<0>(&access_log_data_));
ON_CALL(filter_callbacks_.connection_.stream_info_, onUpstreamHostSelected(_))
.WillByDefault(Invoke(
[this](Upstream::HostDescriptionConstSharedPtr host) { upstream_host_ = host; }));
ON_CALL(filter_callbacks_.connection_.stream_info_, upstreamHost())
.WillByDefault(ReturnPointee(&upstream_host_));
}

~TcpProxyTest() override {
Expand Down Expand Up @@ -953,8 +958,8 @@ class TcpProxyTest : public testing::Test {

NiceMock<Server::Configuration::MockFactoryContext> factory_context_;
ConfigSharedPtr config_;
std::unique_ptr<Filter> filter_;
NiceMock<Network::MockReadFilterCallbacks> filter_callbacks_;
std::unique_ptr<Filter> filter_;
std::vector<std::shared_ptr<NiceMock<Upstream::MockHost>>> upstream_hosts_{};
std::vector<std::unique_ptr<NiceMock<Network::MockClientConnection>>> upstream_connections_{};
std::vector<std::unique_ptr<NiceMock<Tcp::ConnectionPool::MockConnectionData>>>
Expand All @@ -968,6 +973,7 @@ class TcpProxyTest : public testing::Test {
Network::Address::InstanceConstSharedPtr upstream_remote_address_;
std::list<std::function<Tcp::ConnectionPool::Cancellable*(Tcp::ConnectionPool::Cancellable*)>>
new_connection_functions_;
Upstream::HostDescriptionConstSharedPtr upstream_host_{};
};

TEST_F(TcpProxyTest, DEPRECATED_FEATURE_TEST(DefaultRoutes)) {
Expand Down Expand Up @@ -1746,6 +1752,23 @@ TEST_F(TcpProxyTest, ShareFilterState) {
.value());
}

// Tests that filter callback can access downstream and upstream address and ssl properties.
TEST_F(TcpProxyTest, AccessDownstreamAndUpstreamProperties) {
setup(1);

raiseEventUpstreamConnected(0);
EXPECT_EQ(filter_callbacks_.connection().streamInfo().downstreamLocalAddress(),
filter_callbacks_.connection().localAddress());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().downstreamRemoteAddress(),
filter_callbacks_.connection().remoteAddress());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().downstreamSslConnection(),
filter_callbacks_.connection().ssl());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().upstreamLocalAddress(),
upstream_connections_.at(0)->localAddress());
EXPECT_EQ(filter_callbacks_.connection().streamInfo().upstreamSslConnection(),
upstream_connections_.at(0)->streamInfo().downstreamSslConnection());
}

class TcpProxyRoutingTest : public testing::Test {
public:
TcpProxyRoutingTest() = default;
Expand Down Expand Up @@ -1826,13 +1849,10 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(UseClusterFromPerConnectionC
setup();
initializeFilter();

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.filterState()->setData("envoy.tcp_proxy.cluster",
std::make_unique<PerConnectionCluster>("filter_state_cluster"),
StreamInfo::FilterState::StateType::Mutable,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);
ON_CALL(connection_, streamInfo()).WillByDefault(ReturnRef(stream_info));
EXPECT_CALL(Const(connection_), streamInfo()).WillRepeatedly(ReturnRef(stream_info));
connection_.streamInfo().filterState()->setData(
"envoy.tcp_proxy.cluster", std::make_unique<PerConnectionCluster>("filter_state_cluster"),
StreamInfo::FilterState::StateType::Mutable,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

// Expect filter to try to open a connection to specified cluster.
EXPECT_CALL(factory_context_.cluster_manager_,
Expand All @@ -1847,14 +1867,10 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(UpstreamServerName)) {
setup();
initializeFilter();

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.filterState()->setData("envoy.network.upstream_server_name",
std::make_unique<UpstreamServerName>("www.example.com"),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

ON_CALL(connection_, streamInfo()).WillByDefault(ReturnRef(stream_info));
EXPECT_CALL(Const(connection_), streamInfo()).WillRepeatedly(ReturnRef(stream_info));
connection_.streamInfo().filterState()->setData(
"envoy.network.upstream_server_name", std::make_unique<UpstreamServerName>("www.example.com"),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

// Expect filter to try to open a connection to a cluster with the transport socket options with
// override-server-name
Expand Down Expand Up @@ -1882,16 +1898,12 @@ TEST_F(TcpProxyRoutingTest, DEPRECATED_FEATURE_TEST(ApplicationProtocols)) {
setup();
initializeFilter();

NiceMock<StreamInfo::MockStreamInfo> stream_info;
stream_info.filterState()->setData(
connection_.streamInfo().filterState()->setData(
Network::ApplicationProtocols::key(),
std::make_unique<Network::ApplicationProtocols>(std::vector<std::string>{"foo", "bar"}),
StreamInfo::FilterState::StateType::ReadOnly,
StreamInfo::FilterState::LifeSpan::DownstreamConnection);

ON_CALL(connection_, streamInfo()).WillByDefault(ReturnRef(stream_info));
EXPECT_CALL(Const(connection_), streamInfo()).WillRepeatedly(ReturnRef(stream_info));

// Expect filter to try to open a connection to a cluster with the transport socket options with
// override-application-protocol
EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster(_, _, _))
Expand Down
1 change: 1 addition & 0 deletions test/mocks/stream_info/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ envoy_cc_mock(
"//include/envoy/stream_info:stream_info_interface",
"//include/envoy/upstream:upstream_interface",
"//test/mocks/upstream:host_mocks",
"//test/test_common:simulated_time_system_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)
17 changes: 15 additions & 2 deletions test/mocks/stream_info/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ namespace Envoy {
namespace StreamInfo {

MockStreamInfo::MockStreamInfo()
: filter_state_(std::make_shared<FilterStateImpl>(FilterState::LifeSpan::FilterChain)),
: start_time_(ts_.systemTime()),
filter_state_(std::make_shared<FilterStateImpl>(FilterState::LifeSpan::FilterChain)),
downstream_local_address_(new Network::Address::Ipv4Instance("127.0.0.2")),
downstream_direct_remote_address_(new Network::Address::Ipv4Instance("127.0.0.1")),
downstream_remote_address_(new Network::Address::Ipv4Instance("127.0.0.1")) {
ON_CALL(*this, upstreamHost()).WillByDefault(ReturnPointee(&host_));
ON_CALL(*this, setResponseFlag(_)).WillByDefault(Invoke([this](ResponseFlag response_flag) {
response_flags_ |= response_flag;
}));
ON_CALL(*this, startTime()).WillByDefault(ReturnPointee(&start_time_));
ON_CALL(*this, startTimeMonotonic()).WillByDefault(ReturnPointee(&start_time_monotonic_));
ON_CALL(*this, lastDownstreamRxByteReceived())
Expand All @@ -37,6 +40,11 @@ MockStreamInfo::MockStreamInfo()
ON_CALL(*this, lastDownstreamTxByteSent())
.WillByDefault(ReturnPointee(&last_downstream_tx_byte_sent_));
ON_CALL(*this, requestComplete()).WillByDefault(ReturnPointee(&end_time_));
ON_CALL(*this, onRequestComplete()).WillByDefault(Invoke([this]() {
end_time_ = absl::make_optional<std::chrono::nanoseconds>(
std::chrono::duration_cast<std::chrono::nanoseconds>(ts_.systemTime() - start_time_)
.count());
}));
ON_CALL(*this, setUpstreamLocalAddress(_))
.WillByDefault(
Invoke([this](const Network::Address::InstanceConstSharedPtr& upstream_local_address) {
Expand Down Expand Up @@ -85,6 +93,11 @@ MockStreamInfo::MockStreamInfo()
bytes_sent_ += bytes_sent;
}));
ON_CALL(*this, bytesSent()).WillByDefault(ReturnPointee(&bytes_sent_));
ON_CALL(*this, hasResponseFlag(_)).WillByDefault(Invoke([this](ResponseFlag flag) {
return response_flags_ & flag;
}));
ON_CALL(*this, upstreamHost()).WillByDefault(ReturnPointee(&host_));

ON_CALL(*this, dynamicMetadata()).WillByDefault(ReturnRef(metadata_));
ON_CALL(Const(*this), dynamicMetadata()).WillByDefault(ReturnRef(metadata_));
ON_CALL(*this, filterState()).WillByDefault(ReturnRef(filter_state_));
Expand Down
3 changes: 3 additions & 0 deletions test/mocks/stream_info/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "common/stream_info/filter_state_impl.h"

#include "test/mocks/upstream/host.h"
#include "test/test_common/simulated_time_system.h"

#include "gmock/gmock.h"

Expand Down Expand Up @@ -91,6 +92,7 @@ class MockStreamInfo : public StreamInfo {

std::shared_ptr<testing::NiceMock<Upstream::MockHostDescription>> host_{
new testing::NiceMock<Upstream::MockHostDescription>()};
Envoy::Event::SimulatedTimeSystem ts_;
SystemTime start_time_;
MonotonicTime start_time_monotonic_;
absl::optional<std::chrono::nanoseconds> last_downstream_rx_byte_received_;
Expand All @@ -104,6 +106,7 @@ class MockStreamInfo : public StreamInfo {
absl::optional<Http::Protocol> protocol_;
absl::optional<uint32_t> response_code_;
absl::optional<std::string> response_code_details_;
uint64_t response_flags_{};
envoy::config::core::v3::Metadata metadata_;
FilterStateSharedPtr upstream_filter_state_;
FilterStateSharedPtr filter_state_;
Expand Down