Skip to content

Commit

Permalink
xds: adding pass-through xds-failover support
Browse files Browse the repository at this point in the history
Signed-off-by: Adi Suissa-Peleg <adip@google.com>
  • Loading branch information
adisuissa committed Dec 14, 2023
1 parent 2c7edc4 commit ae8d101
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 4 deletions.
8 changes: 8 additions & 0 deletions source/extensions/config_subscription/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "grpc_mux_failover_lib",
hdrs = ["grpc_mux_failover.h"],
deps = [
":grpc_stream_lib",
],
)

envoy_cc_library(
name = "pausable_ack_queue_lib",
srcs = ["pausable_ack_queue.cc"],
Expand Down
158 changes: 158 additions & 0 deletions source/extensions/config_subscription/grpc/grpc_mux_failover.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
#pragma once

#include "source/extensions/config_subscription/grpc/grpc_stream.h"

namespace Envoy {
namespace Config {

/**
* This class arbitrates between two config providers of the same GrpcMux -
* the primary and the failover. Envoy always prefers fetching config from the
* primary source, but if not available, will fetch the config from the failover
* source until the primary is again available.
*
* This class owns the state for the GrpcMux primary and failover streams, and
* proxies the gRPC stream functionality to either the primary or the failover config sources.
* The failover source is optional and will only be used if passed upon initialization.
*
* The primary config source is always preferred over the failover source. If the primary
* is available, the stream to the failover source will terminated.
*
* Failover is supported in both
* SotW (RequestType = envoy::service::discovery::v3::DiscoveryRequest,
* ResponseType = envoy::service::discovery::v3::DiscoveryResponse), and
* Delta-xDS (RequestType = envoy::service::discovery::v3::DeltaDiscoveryRequest,
* ResponseType = envoy::service::discovery::v3::DeltaDiscoveryResponse).
* Both the primary and failover streams are either SotW or Delta-xDS.
*
* The use of this class will be as follows: the GrpcMux object will own an instance of
* the GrpcMuxFailoverManager. The GrpcMuxFailoverManager will own 2 GrpcStreams, primary
* and secondary. Each of the primary and secondary streams will invoke GrpcStreamCallbacks
* on their corresponding objects (also owned by the GrpcMuxFailoverProxy). These invocations
* will be followed by the GrpcMuxFailoverProxy calling the GrpcStreamCallbacks on the GrpcMux
* object that initialized it.
*
* Note: this class is WIP.
*/
template <class RequestType, class ResponseType> class GrpcMuxFailover {
public:
// A GrpcStream creator function that receives the stream callbacks and returns a
// GrpcStream object. This is introduced to facilitate dependency injection for
// testing and will be used to create the primary and failover streams.
using GrpcStreamCreator = std::function<GrpcStreamInterfacePtr<RequestType, ResponseType>(
GrpcStreamCallbacks<ResponseType>* stream_callbacks)>;

GrpcMuxFailover(GrpcStreamCreator primary_stream_creator,
OptRef<GrpcStreamCreator> failover_stream_creator,
GrpcStreamCallbacks<ResponseType>& grpc_mux_callbacks)
: currently_used_service_(ServiceOptions::None), grpc_mux_callbacks_(grpc_mux_callbacks),
primary_callbacks_(*this),
primary_grpc_stream_(std::move(primary_stream_creator(&primary_callbacks_))) {
ASSERT(primary_grpc_stream_ != nullptr);
// At the moment failover isn't implemented.
ASSERT(!failover_stream_creator.has_value());
}

virtual ~GrpcMuxFailover() = default;

void establishNewStream() {
// TODO(adisuissa): At the moment this is a pass-through method. Once the
// implementation matures, this call will be updated.
primary_grpc_stream_->establishNewStream();
}

bool grpcStreamAvailable() const {
// TODO(adisuissa): At the moment this is a pass-through method. Once the
// implementation matures, this call will be updated.
return primary_grpc_stream_->grpcStreamAvailable();
}

void sendMessage(const RequestType& request) {
// TODO(adisuissa): At the moment this is a pass-through method. Once the
// implementation matures, this call will be updated.
primary_grpc_stream_->sendMessage(request);
}

void maybeUpdateQueueSizeStat(uint64_t size) {
// TODO(adisuissa): At the moment this is a pass-through method. Once the
// implementation matures, this call will be updated.
primary_grpc_stream_->maybeUpdateQueueSizeStat(size);
}

bool checkRateLimitAllowsDrain() {
// TODO(adisuissa): At the moment this is a pass-through method. Once the
// implementation matures, this call will be updated.
return primary_grpc_stream_->checkRateLimitAllowsDrain();
}

absl::optional<Grpc::Status::GrpcStatus> getCloseStatusForTest() {
// TODO(adisuissa): At the moment this is a pass-through method. Once the
// implementation matures, this call will be updated.
return primary_grpc_stream_->getCloseStatusForTest();
}

GrpcStreamInterface<RequestType, ResponseType>& currentStreamForTest() {
// TODO(adisuissa): At the moment this is a pass-through method. Once the
// implementation matures, this call will be updated.
return *primary_grpc_stream_.get();
};

private:
// The different states the GrpcMuxFailover can be in. Note that the following
// are not mutually exclusive, and therefor bitwise value is used. The GrpcMuxFailover
// may be connecting to both the primary and failover sources, or connected to
// the failover source and connecting to the primary source simultaneously
struct ServiceOptions {
static constexpr uint32_t None = 0;
static constexpr uint32_t ConnectingToPrimary = 0x1;
static constexpr uint32_t Primary = 0x2;
static constexpr uint32_t ConnectingToFailover = 0x4;
static constexpr uint32_t Failover = 0x8;
};

// A helper class that proxies the callbacks of GrpcStreamCallbacks for the primary service.
class PrimaryGrpcStreamCallbacksWrapper : public GrpcStreamCallbacks<ResponseType> {
public:
PrimaryGrpcStreamCallbacksWrapper(GrpcMuxFailover& parent) : parent_(parent) {}

virtual void onStreamEstablished() override {
// TODO(adisuissa): At the moment this is a pass-through method. Once the
// implementation matures, this call will be updated.
parent_.grpc_mux_callbacks_.onStreamEstablished();
}

virtual void onEstablishmentFailure() override {
// TODO(adisuissa): At the moment this is a pass-through method. Once the
// implementation matures, this call will be updated.
parent_.grpc_mux_callbacks_.onEstablishmentFailure();
}

virtual void onDiscoveryResponse(ResponseProtoPtr<ResponseType>&& message,
ControlPlaneStats& control_plane_stats) override {
// TODO(adisuissa): At the moment this is a pass-through method. Once the
// implementation matures, this call will be updated.
parent_.grpc_mux_callbacks_.onDiscoveryResponse(std::move(message), control_plane_stats);
}

virtual void onWriteable() override {
// TODO(adisuissa): At the moment this is a pass-through method. Once the
// implementation matures, this call will be updated.
parent_.grpc_mux_callbacks_.onWriteable();
}

private:
GrpcMuxFailover& parent_;
};

uint32_t currently_used_service_;
// The stream callbacks that will be invoked on the GrpcMux object, to notify
// about the state of the underlying primary/failover stream.
GrpcStreamCallbacks<ResponseType>& grpc_mux_callbacks_;
// The callbacks that will be invoked by the primary stream.
PrimaryGrpcStreamCallbacksWrapper primary_callbacks_;
// The stream to the primary source.
GrpcStreamInterfacePtr<RequestType, ResponseType> primary_grpc_stream_;
};

} // namespace Config
} // namespace Envoy
2 changes: 1 addition & 1 deletion source/extensions/config_subscription/grpc/grpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class GrpcStream : public GrpcStreamInterface<RequestProto, ResponseProto>,
return false;
}

absl::optional<Grpc::Status::GrpcStatus> getCloseStatusForTest() const override {
absl::optional<Grpc::Status::GrpcStatus> getCloseStatusForTest() const {
return last_close_status_;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ namespace Config {
template <class RequestProto, class ResponseProto>
class GrpcStreamInterface : public Grpc::AsyncStreamCallbacks<ResponseProto> {
public:
virtual ~GrpcStreamInterface() = default;

// Attempt to establish a new gRPC stream to the xDS server.
virtual void establishNewStream() PURE;

Expand All @@ -30,10 +32,9 @@ class GrpcStreamInterface : public Grpc::AsyncStreamCallbacks<ResponseProto> {
// Returns true if a message can be sent from the rate-limiting perspective.
// The rate-limiting counters may be updated by this method.
virtual bool checkRateLimitAllowsDrain() PURE;

// Returns the current close-status, if set.
virtual absl::optional<Grpc::Status::GrpcStatus> getCloseStatusForTest() const PURE;
};

template <class RequestProto, class ResponseProto>
using GrpcStreamInterfacePtr = std::unique_ptr<GrpcStreamInterface<RequestProto, ResponseProto>>;
} // namespace Config
} // namespace Envoy
22 changes: 22 additions & 0 deletions test/extensions/config_subscription/grpc/BUILD
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_mock",
"envoy_cc_test",
"envoy_cc_test_library",
"envoy_package",
Expand Down Expand Up @@ -238,3 +239,24 @@ envoy_cc_test(
"//test/test_common:utility_lib",
],
)

envoy_cc_mock(
name = "grpc_stream_mocks",
hdrs = ["mocks.h"],
deps = [
"//source/extensions/config_subscription/grpc:grpc_stream_interface",
],
)

envoy_cc_test(
name = "grpc_mux_failover_test",
srcs = ["grpc_mux_failover_test.cc"],
deps = [
":grpc_stream_mocks",
"//source/extensions/config_subscription/grpc:grpc_mux_failover_lib",
"//source/extensions/config_subscription/grpc:grpc_stream_interface",
"//test/common/stats:stat_test_utility_lib",
"//test/mocks/config:config_mocks",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
],
)
132 changes: 132 additions & 0 deletions test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#include "envoy/service/discovery/v3/discovery.pb.h"

#include "source/extensions/config_subscription/grpc/grpc_mux_failover.h"

#include "test/common/stats/stat_test_utility.h"
#include "test/extensions/config_subscription/grpc/mocks.h"
#include "test/mocks/config/mocks.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"

using testing::NiceMock;
using testing::Return;

namespace Envoy {
namespace Config {
namespace {

class GrpcMuxFailoverTest : public testing::Test {
protected:
using RequestType = envoy::service::discovery::v3::DiscoveryRequest;
using ResponseType = envoy::service::discovery::v3::DiscoveryResponse;

GrpcMuxFailoverTest()
// The GrpcMuxFailover test uses a the GrpcMuxFailover with mocked GrpcStream objects.
: primary_stream_owner_(std::make_unique<MockGrpcStream<RequestType, ResponseType>>()),
primary_stream_(*primary_stream_owner_.get()),
grpc_mux_failover_(
/*primary_stream_creator=*/
[this](GrpcStreamCallbacks<ResponseType>* callbacks)
-> GrpcStreamInterfacePtr<RequestType, ResponseType> {
primary_callbacks_ = callbacks;
return std::move(primary_stream_owner_);
},
/*failover_stream_creator=*/absl::nullopt,
/*grpc_mux_callbacks=*/grpc_mux_callbacks_) {}

std::unique_ptr<MockGrpcStream<RequestType, ResponseType>> primary_stream_owner_;
MockGrpcStream<RequestType, ResponseType>& primary_stream_;
NiceMock<MockGrpcStreamCallbacks> grpc_mux_callbacks_;
GrpcStreamCallbacks<ResponseType>* primary_callbacks_{nullptr};
GrpcMuxFailover<RequestType, ResponseType> grpc_mux_failover_;
};

// Validates that when establishing a stream the a stream to the primary service
// is established.
// TODO(adisuissa): Update the test once GrpcMuxFailover no longer just
// passes all calls to the primary stream.
TEST_F(GrpcMuxFailoverTest, EstablishPrimaryStream) {
EXPECT_CALL(primary_stream_, establishNewStream());
grpc_mux_failover_.establishNewStream();
}

// Validates that grpcStreamAvailable returns true only if a stream
// is available.
// TODO(adisuissa): Update the test once GrpcMuxFailover no longer just
// passes all calls to the primary stream.
TEST_F(GrpcMuxFailoverTest, ValidatePrimaryStreamAvailable) {
EXPECT_CALL(primary_stream_, grpcStreamAvailable()).WillOnce(Return(false));
EXPECT_FALSE(grpc_mux_failover_.grpcStreamAvailable());
}

// Validates that a message is sent to the primary stream once it is available.
// TODO(adisuissa): Update the test once GrpcMuxFailover no longer just
// passes all calls to the primary stream.
TEST_F(GrpcMuxFailoverTest, SendMessagePrimaryAvailable) {
RequestType msg;
msg.set_version_info("123");
EXPECT_CALL(primary_stream_, sendMessage(_));
grpc_mux_failover_.sendMessage(msg);
}

// Validates that updating the queue size of the primary stream once it is available.
// TODO(adisuissa): Update the test once GrpcMuxFailover no longer just
// passes all calls to the primary stream.
TEST_F(GrpcMuxFailoverTest, MaybeUpdateQueueSizePrimaryAvailable) {
EXPECT_CALL(primary_stream_, maybeUpdateQueueSizeStat(_));
grpc_mux_failover_.maybeUpdateQueueSizeStat(123);
}

// Validates that checkRateLimitAllowsDrain is invoked on the primary stream
// once it is available.
// TODO(adisuissa): Update the test once GrpcMuxFailover no longer just
// passes all calls to the primary stream.
TEST_F(GrpcMuxFailoverTest, CheckRateLimitPrimaryStreamAvailable) {
EXPECT_CALL(primary_stream_, checkRateLimitAllowsDrain()).WillOnce(Return(false));
EXPECT_FALSE(grpc_mux_failover_.checkRateLimitAllowsDrain());
}

// Validates that onStreamEstablished callback is invoked on the primary stream
// callbacks.
// TODO(adisuissa): Update the test once GrpcMuxFailover no longer just
// passes all calls to the primary stream.
TEST_F(GrpcMuxFailoverTest, PrimaryOnStreamEstablishedInvoked) {
EXPECT_CALL(grpc_mux_callbacks_, onStreamEstablished());
primary_callbacks_->onStreamEstablished();
}

// Validates that onEstablishmentFailure callback is invoked on the primary stream
// callbacks.
// TODO(adisuissa): Update the test once GrpcMuxFailover no longer just
// passes all calls to the primary stream.
TEST_F(GrpcMuxFailoverTest, PrimaryOnEstablishmentFailureInvoked) {
EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure());
primary_callbacks_->onEstablishmentFailure();
}

// Validates that onDiscoveryResponse callback is invoked on the primary stream
// callbacks.
// TODO(adisuissa): Update the test once GrpcMuxFailover no longer just
// passes all calls to the primary stream.
TEST_F(GrpcMuxFailoverTest, PrimaryOnDiscoveryResponseInvoked) {
std::unique_ptr<ResponseType> response(std::make_unique<ResponseType>());
response->set_version_info("456");
Stats::TestUtil::TestStore stats;
ControlPlaneStats cp_stats{Utility::generateControlPlaneStats(*stats.rootScope())};
EXPECT_CALL(grpc_mux_callbacks_, onDiscoveryResponse(_, _));
primary_callbacks_->onDiscoveryResponse(std::move(response), cp_stats);
}

// Validates that onWritable callback is invoked on the primary stream
// callbacks.
// TODO(adisuissa): Update the test once GrpcMuxFailover no longer just
// passes all calls to the primary stream.
TEST_F(GrpcMuxFailoverTest, PrimaryOnWriteableInvoked) {
EXPECT_CALL(grpc_mux_callbacks_, onWriteable());
primary_callbacks_->onWriteable();
}

} // namespace
} // namespace Config
} // namespace Envoy
29 changes: 29 additions & 0 deletions test/extensions/config_subscription/grpc/mocks.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

#include "source/extensions/config_subscription/grpc/grpc_stream_interface.h"

#include "gmock/gmock.h"

namespace Envoy {
namespace Config {

template <class RequestProto, class ResponseProto>
class MockGrpcStream : public GrpcStreamInterface<RequestProto, ResponseProto> {
public:
MockGrpcStream() {}
~MockGrpcStream() override = default;

MOCK_METHOD(void, establishNewStream, ());
MOCK_METHOD(bool, grpcStreamAvailable, (), (const));
MOCK_METHOD(void, sendMessage, (const RequestProto& request));
MOCK_METHOD(void, maybeUpdateQueueSizeStat, (uint64_t size));
MOCK_METHOD(bool, checkRateLimitAllowsDrain, ());
MOCK_METHOD(void, onCreateInitialMetadata, (Http::RequestHeaderMap & metadata));
MOCK_METHOD(void, onReceiveInitialMetadata, (Http::ResponseHeaderMapPtr && metadata));
MOCK_METHOD(void, onReceiveMessage, (ResponseProtoPtr<ResponseProto> && message));
MOCK_METHOD(void, onReceiveTrailingMetadata, (Http::ResponseTrailerMapPtr && metadata));
MOCK_METHOD(void, onRemoteClose, (Grpc::Status::GrpcStatus status, const std::string& message));
};

} // namespace Config
} // namespace Envoy

0 comments on commit ae8d101

Please sign in to comment.