Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
EventEngine::RunAfter: GrpcLb (grpc#30043)
Browse files Browse the repository at this point in the history
* EventEngine::RunAfter: GrpcLb

* Automated change: Fix sanity tests

* add exec_ctx to callbacks

* fix use after move

* remove ref-counting bug; add more ref traces

* reviewer feedback

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
  • Loading branch information
drfloob and drfloob authored Dec 8, 2022
1 parent 7eb99ba commit b19604e
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 88 deletions.
1 change: 0 additions & 1 deletion src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3457,7 +3457,6 @@ grpc_cc_library(
"//:grpc_resolver_fake",
"//:grpc_security_base",
"//:grpc_trace",
"//:iomgr_timer",
"//:orphanable",
"//:protobuf_duration_upb",
"//:protobuf_timestamp_upb",
Expand Down
166 changes: 79 additions & 87 deletions src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/socket_utils.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
Expand Down Expand Up @@ -495,7 +494,8 @@ class GrpcLb : public LoadBalancingPolicy {
"entering fallback mode",
parent_.get(), status.ToString().c_str());
parent_->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&parent_->lb_fallback_timer_);
parent_->channel_control_helper()->GetEventEngine()->Cancel(
*parent_->lb_fallback_timer_handle_);
parent_->fallback_mode_ = true;
parent_->CreateOrUpdateChildPolicyLocked();
// Cancel the watch, since we don't care about the channel state once we
Expand All @@ -516,14 +516,12 @@ class GrpcLb : public LoadBalancingPolicy {

// Methods for dealing with fallback state.
void MaybeEnterFallbackModeAfterStartup();
static void OnFallbackTimer(void* arg, grpc_error_handle error);
void OnFallbackTimerLocked(grpc_error_handle error);
void OnFallbackTimerLocked();

// Methods for dealing with the balancer call.
void StartBalancerCallLocked();
void StartBalancerCallRetryTimerLocked();
static void OnBalancerCallRetryTimer(void* arg, grpc_error_handle error);
void OnBalancerCallRetryTimerLocked(grpc_error_handle error);
void OnBalancerCallRetryTimerLocked();

// Methods for dealing with the child policy.
ChannelArgs CreateChildPolicyArgsLocked(
Expand All @@ -536,8 +534,7 @@ class GrpcLb : public LoadBalancingPolicy {
void CacheDeletedSubchannelLocked(
RefCountedPtr<SubchannelInterface> subchannel);
void StartSubchannelCacheTimerLocked();
static void OnSubchannelCacheTimer(void* arg, grpc_error_handle error);
void OnSubchannelCacheTimerLocked(grpc_error_handle error);
void OnSubchannelCacheTimerLocked();

// Who the client is trying to communicate with.
std::string server_name_;
Expand Down Expand Up @@ -568,9 +565,7 @@ class GrpcLb : public LoadBalancingPolicy {
const Duration lb_call_timeout_;
// Balancer call retry state.
BackOff lb_call_backoff_;
bool retry_timer_callback_pending_ = false;
grpc_timer lb_call_retry_timer_;
grpc_closure lb_on_call_retry_;
absl::optional<EventEngine::TaskHandle> lb_call_retry_timer_handle_;

// The deserialized response from the balancer. May be nullptr until one
// such response has arrived.
Expand All @@ -588,8 +583,7 @@ class GrpcLb : public LoadBalancingPolicy {
// we have not received a serverlist from the balancer.
const Duration fallback_at_startup_timeout_;
bool fallback_at_startup_checks_pending_ = false;
grpc_timer lb_fallback_timer_;
grpc_closure lb_on_fallback_;
absl::optional<EventEngine::TaskHandle> lb_fallback_timer_handle_;

// The child policy to use for the backends.
OrphanablePtr<LoadBalancingPolicy> child_policy_;
Expand All @@ -601,9 +595,7 @@ class GrpcLb : public LoadBalancingPolicy {
std::map<Timestamp /*deletion time*/,
std::vector<RefCountedPtr<SubchannelInterface>>>
cached_subchannels_;
grpc_timer subchannel_cache_timer_;
grpc_closure on_subchannel_cache_timer_;
bool subchannel_cache_timer_pending_ = false;
absl::optional<EventEngine::TaskHandle> subchannel_cache_timer_handle_;
};

//
Expand Down Expand Up @@ -1263,7 +1255,8 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
}
if (grpclb_policy()->fallback_at_startup_checks_pending_) {
grpclb_policy()->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_);
grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
*grpclb_policy()->lb_fallback_timer_handle_);
grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
}
// Update the serverlist in the GrpcLb instance. This serverlist
Expand All @@ -1281,7 +1274,8 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
grpclb_policy());
if (grpclb_policy()->fallback_at_startup_checks_pending_) {
grpclb_policy()->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_);
grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
*grpclb_policy()->lb_fallback_timer_handle_);
grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
}
grpclb_policy()->fallback_mode_ = true;
Expand Down Expand Up @@ -1348,7 +1342,8 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
"serverlist; entering fallback mode",
grpclb_policy());
grpclb_policy()->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&grpclb_policy()->lb_fallback_timer_);
grpclb_policy()->channel_control_helper()->GetEventEngine()->Cancel(
*grpclb_policy()->lb_fallback_timer_handle_);
grpclb_policy()->CancelBalancerChannelConnectivityWatchLocked();
grpclb_policy()->fallback_mode_ = true;
grpclb_policy()->CreateOrUpdateChildPolicyLocked();
Expand Down Expand Up @@ -1500,29 +1495,25 @@ GrpcLb::GrpcLb(Args args)
"[grpclb %p] Will use '%s' as the server name for LB request.",
this, server_name_.c_str());
}
// Closure Initialization
GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimer, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&on_subchannel_cache_timer_, &OnSubchannelCacheTimer, this,
nullptr);
}

void GrpcLb::ShutdownLocked() {
shutting_down_ = true;
lb_calld_.reset();
if (subchannel_cache_timer_pending_) {
subchannel_cache_timer_pending_ = false;
grpc_timer_cancel(&subchannel_cache_timer_);
if (subchannel_cache_timer_handle_.has_value()) {
channel_control_helper()->GetEventEngine()->Cancel(
*subchannel_cache_timer_handle_);
subchannel_cache_timer_handle_.reset();
}
cached_subchannels_.clear();
if (retry_timer_callback_pending_) {
grpc_timer_cancel(&lb_call_retry_timer_);
if (lb_call_retry_timer_handle_.has_value()) {
channel_control_helper()->GetEventEngine()->Cancel(
*lb_call_retry_timer_handle_);
}
if (fallback_at_startup_checks_pending_) {
fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&lb_fallback_timer_);
channel_control_helper()->GetEventEngine()->Cancel(
*lb_fallback_timer_handle_);
CancelBalancerChannelConnectivityWatchLocked();
}
if (child_policy_ != nullptr) {
Expand Down Expand Up @@ -1583,9 +1574,18 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
if (is_initial_update) {
fallback_at_startup_checks_pending_ = true;
// Start timer.
Timestamp deadline = Timestamp::Now() + fallback_at_startup_timeout_;
Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback
grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
lb_fallback_timer_handle_ =
channel_control_helper()->GetEventEngine()->RunAfter(
fallback_at_startup_timeout_,
[self = static_cast<RefCountedPtr<GrpcLb>>(
Ref(DEBUG_LOCATION, "on_fallback_timer"))]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto self_ptr = self.get();
self_ptr->work_serializer()->Run(
[self = std::move(self)]() { self->OnFallbackTimerLocked(); },
DEBUG_LOCATION);
});
// Start watching the channel's connectivity state. If the channel
// goes into state TRANSIENT_FAILURE before the timer fires, we go into
// fallback mode even if the fallback timeout has not elapsed.
Expand Down Expand Up @@ -1675,10 +1675,9 @@ void GrpcLb::StartBalancerCallLocked() {
}

void GrpcLb::StartBalancerCallRetryTimerLocked() {
Timestamp next_try = lb_call_backoff_.NextAttemptTime();
Duration timeout = lb_call_backoff_.NextAttemptTime() - Timestamp::Now();
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
Duration timeout = next_try - Timestamp::Now();
if (timeout > Duration::Zero()) {
gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
this, timeout.millis());
Expand All @@ -1687,33 +1686,30 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() {
this);
}
}
// TODO(roth): We currently track this ref manually. Once the
// ClosureRef API is ready, we should pass the RefCountedPtr<> along
// with the callback.
auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
self.release();
retry_timer_callback_pending_ = true;
grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
}

void GrpcLb::OnBalancerCallRetryTimer(void* arg, grpc_error_handle error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->work_serializer()->Run(
[grpclb_policy, error]() {
grpclb_policy->OnBalancerCallRetryTimerLocked(error);
},
DEBUG_LOCATION);
lb_call_retry_timer_handle_ =
channel_control_helper()->GetEventEngine()->RunAfter(
timeout,
[self = static_cast<RefCountedPtr<GrpcLb>>(
Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer"))]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto self_ptr = self.get();
self_ptr->work_serializer()->Run(
[self = std::move(self)]() {
self->OnBalancerCallRetryTimerLocked();
},
DEBUG_LOCATION);
});
}

void GrpcLb::OnBalancerCallRetryTimerLocked(grpc_error_handle error) {
retry_timer_callback_pending_ = false;
if (!shutting_down_ && error.ok() && lb_calld_ == nullptr) {
void GrpcLb::OnBalancerCallRetryTimerLocked() {
lb_call_retry_timer_handle_.reset();
if (!shutting_down_ && lb_calld_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server", this);
}
StartBalancerCallLocked();
}
Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
}

//
Expand All @@ -1738,17 +1734,10 @@ void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
}
}

void GrpcLb::OnFallbackTimer(void* arg, grpc_error_handle error) {
GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
grpclb_policy->work_serializer()->Run(
[grpclb_policy, error]() { grpclb_policy->OnFallbackTimerLocked(error); },
DEBUG_LOCATION);
}

void GrpcLb::OnFallbackTimerLocked(grpc_error_handle error) {
void GrpcLb::OnFallbackTimerLocked() {
// If we receive a serverlist after the timer fires but before this callback
// actually runs, don't fall back.
if (fallback_at_startup_checks_pending_ && !shutting_down_ && error.ok()) {
if (fallback_at_startup_checks_pending_ && !shutting_down_) {
gpr_log(GPR_INFO,
"[grpclb %p] No response from balancer after fallback timeout; "
"entering fallback mode",
Expand All @@ -1758,7 +1747,6 @@ void GrpcLb::OnFallbackTimerLocked(grpc_error_handle error) {
fallback_mode_ = true;
CreateOrUpdateChildPolicyLocked();
}
Unref(DEBUG_LOCATION, "on_fallback_timer");
}

//
Expand All @@ -1781,7 +1769,8 @@ OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = work_serializer();
lb_policy_args.args = args;
lb_policy_args.channel_control_helper = std::make_unique<Helper>(Ref());
lb_policy_args.channel_control_helper =
std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_lb_glb_trace);
Expand All @@ -1804,9 +1793,10 @@ void GrpcLb::CreateOrUpdateChildPolicyLocked() {
bool is_backend_from_grpclb_load_balancer = false;
if (fallback_mode_) {
// If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
// received any serverlist from the balancer, we use the fallback backends
// returned by the resolver. Note that the fallback backend list may be
// empty, in which case the new child policy will fail the picks.
// received any serverlist from the balancer, we use the fallback
// backends returned by the resolver. Note that the fallback backend
// list may be empty, in which case the new child policy will fail the
// picks.
update_args.addresses = fallback_backend_addresses_;
if (fallback_backend_addresses_.ok() &&
fallback_backend_addresses_->empty()) {
Expand Down Expand Up @@ -1845,28 +1835,32 @@ void GrpcLb::CacheDeletedSubchannelLocked(
RefCountedPtr<SubchannelInterface> subchannel) {
Timestamp deletion_time = Timestamp::Now() + subchannel_cache_interval_;
cached_subchannels_[deletion_time].push_back(std::move(subchannel));
if (!subchannel_cache_timer_pending_) {
Ref(DEBUG_LOCATION, "OnSubchannelCacheTimer").release();
subchannel_cache_timer_pending_ = true;
if (!subchannel_cache_timer_handle_.has_value()) {
StartSubchannelCacheTimerLocked();
}
}

void GrpcLb::StartSubchannelCacheTimerLocked() {
GPR_ASSERT(!cached_subchannels_.empty());
grpc_timer_init(&subchannel_cache_timer_, cached_subchannels_.begin()->first,
&on_subchannel_cache_timer_);
}

void GrpcLb::OnSubchannelCacheTimer(void* arg, grpc_error_handle error) {
auto* self = static_cast<GrpcLb*>(arg);
self->work_serializer()->Run(
[self, error]() { self->GrpcLb::OnSubchannelCacheTimerLocked(error); },
DEBUG_LOCATION);
subchannel_cache_timer_handle_ =
channel_control_helper()->GetEventEngine()->RunAfter(
cached_subchannels_.begin()->first - Timestamp::Now(),
[self = static_cast<RefCountedPtr<GrpcLb>>(
Ref(DEBUG_LOCATION, "OnSubchannelCacheTimer"))]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto* self_ptr = self.get();
self_ptr->work_serializer()->Run(
[self = std::move(self)]() mutable {
self->OnSubchannelCacheTimerLocked();
},
DEBUG_LOCATION);
});
}

void GrpcLb::OnSubchannelCacheTimerLocked(grpc_error_handle error) {
if (subchannel_cache_timer_pending_ && error.ok()) {
void GrpcLb::OnSubchannelCacheTimerLocked() {
if (subchannel_cache_timer_handle_.has_value()) {
subchannel_cache_timer_handle_.reset();
auto it = cached_subchannels_.begin();
if (it != cached_subchannels_.end()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
Expand All @@ -1880,9 +1874,7 @@ void GrpcLb::OnSubchannelCacheTimerLocked(grpc_error_handle error) {
StartSubchannelCacheTimerLocked();
return;
}
subchannel_cache_timer_pending_ = false;
}
Unref(DEBUG_LOCATION, "OnSubchannelCacheTimer");
}

//
Expand Down

0 comments on commit b19604e

Please sign in to comment.