Skip to content

Commit

Permalink
grpc: add onServiceReachable support for EnvoyGrpc
Browse files Browse the repository at this point in the history
Signed-off-by: Adi Suissa-Peleg <adip@google.com>
  • Loading branch information
adisuissa committed Jan 19, 2024
1 parent 6b592d7 commit d17d9e4
Show file tree
Hide file tree
Showing 28 changed files with 293 additions and 14 deletions.
5 changes: 5 additions & 0 deletions envoy/grpc/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ class RawAsyncStreamCallbacks {
*/
virtual void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) PURE;

/**
* Called when the service that this stream needs to connect to is established.
*/
virtual void onServiceReachable() PURE;

/**
* Called when initial metadata is received. This will be called with empty metadata on a
* trailers-only response, followed by onReceiveTrailingMetadata() with the trailing metadata.
Expand Down
10 changes: 10 additions & 0 deletions source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ void AsyncStreamImpl::initialize(bool buffer_body_for_retry) {
void AsyncStreamImpl::onHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
const auto http_response_status = Http::Utility::getResponseStatus(*headers);
const auto grpc_status = Common::getGrpcStatus(*headers);
// If the HTTP status is OK and there is no gRPC status (will be part of
// the trailers) or there is one with an OK gRPC status, notify that the
// upstream is available.
if ((http_response_status == enumToInt(Http::Code::OK)) &&
(!grpc_status.has_value() ||
(grpc_status.value() == Grpc::Status::WellKnownGrpcStatus::Ok))) {
callbacks_.onServiceReachable();
}
callbacks_.onReceiveInitialMetadata(end_stream ? Http::ResponseHeaderMapImpl::create()
: std::move(headers));
if (http_response_status != enumToInt(Http::Code::OK)) {
Expand Down Expand Up @@ -260,6 +268,8 @@ void AsyncRequestImpl::onCreateInitialMetadata(Http::RequestHeaderMap& metadata)
callbacks_.onCreateInitialMetadata(metadata);
}

void AsyncRequestImpl::onServiceReachable() {}

void AsyncRequestImpl::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) {}

bool AsyncRequestImpl::onReceiveMessageRaw(Buffer::InstancePtr&& response) {
Expand Down
4 changes: 3 additions & 1 deletion source/common/grpc/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ class AsyncStreamImpl : public RawAsyncStream,
std::string method_name_;
RawAsyncStreamCallbacks& callbacks_;
Http::AsyncClient::StreamOptions options_;
bool http_reset_{};
Http::AsyncClient::Stream* stream_{};
Decoder decoder_;
// This is a member to avoid reallocation on every onData().
std::vector<Frame> decoded_frames_;
bool http_reset_{};
bool service_reachable_{};

friend class AsyncClientImpl;
};
Expand All @@ -118,6 +119,7 @@ class AsyncRequestImpl : public AsyncRequest, public AsyncStreamImpl, RawAsyncSt
private:
// Grpc::AsyncStreamCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
void onServiceReachable() override;
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override;
bool onReceiveMessageRaw(Buffer::InstancePtr&& response) override;
void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override;
Expand Down
3 changes: 3 additions & 0 deletions source/common/grpc/google_async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,9 @@ void GoogleAsyncRequestImpl::onCreateInitialMetadata(Http::RequestHeaderMap& met
callbacks_.onCreateInitialMetadata(metadata);
}

// Only relevant to the xDS stream reachability.
void GoogleAsyncRequestImpl::onServiceReachable() {}

void GoogleAsyncRequestImpl::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) {}

bool GoogleAsyncRequestImpl::onReceiveMessageRaw(Buffer::InstancePtr&& response) {
Expand Down
1 change: 1 addition & 0 deletions source/common/grpc/google_async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ class GoogleAsyncRequestImpl : public AsyncRequest,
private:
// Grpc::RawAsyncStreamCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
void onServiceReachable() override;
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override;
bool onReceiveMessageRaw(Buffer::InstancePtr&& response) override;
void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override;
Expand Down
2 changes: 2 additions & 0 deletions source/common/upstream/health_discovery_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ void HdsDelegate::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
UNREFERENCED_PARAMETER(metadata);
}

void HdsDelegate::onServiceReachable() {}

void HdsDelegate::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) {
UNREFERENCED_PARAMETER(metadata);
}
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/health_discovery_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class HdsDelegate : Grpc::AsyncStreamCallbacks<envoy::service::health::v3::Healt

// Grpc::AsyncStreamCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
void onServiceReachable() override;
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) override;
void onReceiveMessage(
std::unique_ptr<envoy::service::health::v3::HealthCheckSpecifier>&& message) override;
Expand Down
2 changes: 2 additions & 0 deletions source/common/upstream/load_stats_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ void LoadStatsReporter::onCreateInitialMetadata(Http::RequestHeaderMap& metadata
UNREFERENCED_PARAMETER(metadata);
}

void LoadStatsReporter::onServiceReachable() {}

void LoadStatsReporter::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) {
UNREFERENCED_PARAMETER(metadata);
}
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/load_stats_reporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class LoadStatsReporter

// Grpc::AsyncStreamCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
void onServiceReachable() override;
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) override;
void onReceiveMessage(
std::unique_ptr<envoy::service::load_stats::v3::LoadStatsResponse>&& message) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ class StreamingGrpcAccessLogClient : public GrpcAccessLogClient<LogRequest, LogR

// Grpc::AsyncStreamCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
void onServiceReachable() override {}
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {}
void onReceiveMessage(std::unique_ptr<LogResponse>&&) override {}
void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {}
Expand Down
4 changes: 4 additions & 0 deletions source/extensions/common/wasm/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,10 @@ std::pair<uint32_t, std::string_view> Context::getStatus() {
return std::make_pair(status_code_, toStdStringView(status_message_));
}

void Context::onGrpcServiceReachable(uint32_t) {
// Currently this is not support by the WASM SDK.
}

void Context::onGrpcReceiveInitialMetadataWrapper(uint32_t token, Http::HeaderMapPtr&& metadata) {
grpc_receive_initial_metadata_ = std::move(metadata);
onGrpcReceiveInitialMetadata(token, headerSize(grpc_receive_initial_metadata_));
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/common/wasm/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ class Context : public proxy_wasm::ContextBase,
void onCreateInitialMetadata(Http::RequestHeaderMap& initial_metadata) override {
context_->onGrpcCreateInitialMetadata(token_, initial_metadata);
}
void onServiceReachable() override { context_->onGrpcServiceReachable(token_); }
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) override {
context_->onGrpcReceiveInitialMetadataWrapper(token_, std::move(metadata));
}
Expand Down Expand Up @@ -365,6 +366,7 @@ class Context : public proxy_wasm::ContextBase,
void onHttpCallFailure(uint32_t token, Http::AsyncClient::FailureReason reason);

void onGrpcCreateInitialMetadata(uint32_t token, Http::RequestHeaderMap& metadata);
void onGrpcServiceReachable(uint32_t token);
void onGrpcReceiveInitialMetadataWrapper(uint32_t token, Http::HeaderMapPtr&& metadata);
void onGrpcReceiveWrapper(uint32_t token, ::Envoy::Buffer::InstancePtr response);
void onGrpcReceiveTrailingMetadataWrapper(uint32_t token, Http::HeaderMapPtr&& metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ GrpcMuxImpl::GrpcMuxImpl(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_
: grpc_stream_(this, std::move(grpc_mux_context.async_client_),
grpc_mux_context.service_method_, grpc_mux_context.dispatcher_,
grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_),
grpc_mux_context.rate_limit_settings_),
grpc_mux_context.rate_limit_settings_, false),
local_info_(grpc_mux_context.local_info_), skip_subsequent_node_(skip_subsequent_node),
config_validators_(std::move(grpc_mux_context.config_validators_)),
xds_config_tracker_(grpc_mux_context.xds_config_tracker_),
Expand Down
40 changes: 35 additions & 5 deletions source/extensions/config_subscription/grpc/grpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ class GrpcStream : public GrpcStreamInterface<RequestProto, ResponseProto>,
GrpcStream(GrpcStreamCallbacks<ResponseProto>* callbacks, Grpc::RawAsyncClientPtr async_client,
const Protobuf::MethodDescriptor& service_method, Event::Dispatcher& dispatcher,
Stats::Scope& scope, BackOffStrategyPtr backoff_strategy,
const RateLimitSettings& rate_limit_settings)
const RateLimitSettings& rate_limit_settings, bool enable_service_reachable)
: callbacks_(callbacks), async_client_(std::move(async_client)),
service_method_(service_method),
control_plane_stats_(Utility::generateControlPlaneStats(scope)),
time_source_(dispatcher.timeSource()), backoff_strategy_(std::move(backoff_strategy)),
rate_limiting_enabled_(rate_limit_settings.enabled_) {
rate_limiting_enabled_(rate_limit_settings.enabled_),
enable_service_reachable_(enable_service_reachable), service_reachable_(false) {
retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); });
if (rate_limiting_enabled_) {
// Default Bucket contains 100 tokens maximum and refills at 10 tokens/sec.
Expand Down Expand Up @@ -63,11 +64,33 @@ class GrpcStream : public GrpcStreamInterface<RequestProto, ResponseProto>,
setRetryTimer();
return;
}
control_plane_stats_.connected_state_.set(1);
callbacks_->onStreamEstablished();
if (!enable_service_reachable_) {
// The following callback will be executed before the actual stream is established
// and reachable, as the `async_client_->start()` call above is non-blocking.
control_plane_stats_.connected_state_.set(1);
callbacks_->onStreamEstablished();
}
}

bool grpcStreamAvailable() const override { return stream_ != nullptr; }
bool grpcStreamAvailable() const override {
if (enable_service_reachable_) {
return (stream_ != nullptr) && (service_reachable_);
} else {
return stream_ != nullptr;
}
}

void onServiceReachable() override {
// Only invoke service-reachable callback if it is enabled.
if (!enable_service_reachable_) {
return;
}
if (!service_reachable_) {
service_reachable_ = true;
ENVOY_LOG_MISC(trace, "xDS gRPC stream is reachable and is now established");
callbacks_->onStreamEstablished();
}
}

void sendMessage(const RequestProto& request) override { stream_->sendMessage(request, false); }

Expand Down Expand Up @@ -98,6 +121,9 @@ class GrpcStream : public GrpcStreamInterface<RequestProto, ResponseProto>,
}

void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override {
if (enable_service_reachable_) {
service_reachable_ = false;
}
logClose(status, message);
stream_ = nullptr;
control_plane_stats_.connected_state_.set(0);
Expand Down Expand Up @@ -243,6 +269,10 @@ class GrpcStream : public GrpcStreamInterface<RequestProto, ResponseProto>,
absl::optional<Grpc::Status::GrpcStatus> last_close_status_;
std::string last_close_message_;
MonotonicTime last_close_time_;

// Whether the xDS service is reachable or not.
const bool enable_service_reachable_;
bool service_reachable_;
};

} // namespace Config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ NewGrpcMuxImpl::NewGrpcMuxImpl(GrpcMuxContext& grpc_mux_context)
: grpc_stream_(this, std::move(grpc_mux_context.async_client_),
grpc_mux_context.service_method_, grpc_mux_context.dispatcher_,
grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_),
grpc_mux_context.rate_limit_settings_),
grpc_mux_context.rate_limit_settings_, false),
local_info_(grpc_mux_context.local_info_),
config_validators_(std::move(grpc_mux_context.config_validators_)),
dynamic_update_callback_handle_(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ GrpcMuxImpl<S, F, RQ, RS>::GrpcMuxImpl(std::unique_ptr<F> subscription_state_fac
: grpc_stream_(this, std::move(grpc_mux_content.async_client_),
grpc_mux_content.service_method_, grpc_mux_content.dispatcher_,
grpc_mux_content.scope_, std::move(grpc_mux_content.backoff_strategy_),
grpc_mux_content.rate_limit_settings_),
grpc_mux_content.rate_limit_settings_, false),
subscription_state_factory_(std::move(subscription_state_factory)),
skip_subsequent_node_(skip_subsequent_node), local_info_(grpc_mux_content.local_info_),
dynamic_update_callback_handle_(
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/http/ext_proc/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ void ExternalProcessorStreamImpl::onReceiveMessage(ProcessingResponsePtr&& respo
}

void ExternalProcessorStreamImpl::onCreateInitialMetadata(Http::RequestHeaderMap&) {}
void ExternalProcessorStreamImpl::onServiceReachable() {}
void ExternalProcessorStreamImpl::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) {}
void ExternalProcessorStreamImpl::onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) {}

Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class ExternalProcessorStreamImpl : public ExternalProcessorStream,

// RawAsyncStreamCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
void onServiceReachable() override;
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) override;
void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) override;
void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class RateLimitClientImpl : public RateLimitClient,

// RawAsyncStreamCallbacks methods;
void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
void onServiceReachable() override {}
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {}
void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {}
void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class GrpcMetricsStreamer : public Grpc::AsyncStreamCallbacks<ResponseProto> {

// Grpc::AsyncStreamCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
void onServiceReachable() override {}
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {}
void onReceiveMessage(std::unique_ptr<ResponseProto>&&) override {}
void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class OpenTelemetryGrpcTraceExporterClient : Logger::Loggable<Logger::Id::tracin

// Grpc::AsyncStreamCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
void onServiceReachable() override {}
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {}
void onReceiveMessage(
std::unique_ptr<opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse>&&)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class TraceSegmentReporter : public Logger::Loggable<Logger::Id::tracing>,

// Grpc::AsyncStreamCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
void onServiceReachable() override {}
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {}
void onReceiveMessage(std::unique_ptr<skywalking::v3::Commands>&&) override {}
void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {}
Expand Down
Loading

0 comments on commit d17d9e4

Please sign in to comment.