diff --git a/config/base_config_test.go b/config/base_config_test.go index 7d5fa5534b..ce0e0316bf 100644 --- a/config/base_config_test.go +++ b/config/base_config_test.go @@ -23,6 +23,7 @@ import ( import ( "github.com/stretchr/testify/assert" + "go.uber.org/atomic" ) import ( @@ -125,7 +126,7 @@ func TestRefresh(t *testing.T) { StepTimeout: "2s", RejectRequestHandler: "mock", RejectRequest: false, - RequestsFinished: false, + RequestsFinished: &atomic.Bool{}, }, } diff --git a/config/config_loader_test.go b/config/config_loader_test.go index b0935a2885..084b6a8639 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -18,6 +18,9 @@ package config import ( + "context" + "dubbo.apache.org/dubbo-go/v3/filter" + "dubbo.apache.org/dubbo-go/v3/protocol" "path/filepath" "sort" "sync" @@ -74,6 +77,13 @@ func TestConfigLoader(t *testing.T) { } func TestLoad(t *testing.T) { + extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter { + return &mockGracefulShutdownFilter{} + }) + extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter { + return &mockGracefulShutdownFilter{} + }) + doInitConsumer() doInitProvider() @@ -596,3 +606,17 @@ func ConvertURLArrToIntfArr(urls []*common.URL) []interface{} { } return res } + +type mockGracefulShutdownFilter struct{} + +func (f *mockGracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + panic("implement me") +} + +func (f *mockGracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + panic("implement me") +} + +func (f *mockGracefulShutdownFilter) Set(name string, config interface{}) { + return +} diff --git a/config/config_setter.go b/config/config_setter.go new file mode 100644 index 0000000000..9f114f26d0 --- /dev/null +++ b/config/config_setter.go @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +const ( + GracefulShutdownFilterShutdownConfig = "GracefulShutdownFilterShutdownConfig" +) + +type Setter interface { + Set(name string, config interface{}) +} diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go index fd34e200be..6c2875a91b 100644 --- a/config/graceful_shutdown.go +++ b/config/graceful_shutdown.go @@ -59,6 +59,14 @@ func GracefulShutdownInit() { signal.Notify(signals, ShutdownSignals...) + // retrieve ShutdownConfig for gracefulShutdownFilter + if filter, ok := extension.GetFilter(constant.CONSUMER_SHUTDOWN_FILTER).(Setter); ok && GetConsumerConfig().ShutdownConfig != nil { + filter.Set(GracefulShutdownFilterShutdownConfig, GetConsumerConfig().ShutdownConfig) + } + if filter, ok := extension.GetFilter(constant.PROVIDER_SHUTDOWN_FILTER).(Setter); ok && GetProviderConfig().ShutdownConfig != nil { + filter.Set(GracefulShutdownFilterShutdownConfig, GetProviderConfig().ShutdownConfig) + } + go func() { select { case sig := <-signals: @@ -163,7 +171,6 @@ func waitForReceivingRequests() { // ignore this step return } - providerConfig.ShutdownConfig.RejectRequest = true waitingProcessedTimeout(providerConfig.ShutdownConfig) } @@ -174,7 +181,6 @@ func waitForSendingRequests() { // ignore this step return } - consumerConfig.ShutdownConfig.RejectRequest = true waitingProcessedTimeout(consumerConfig.ShutdownConfig) } @@ -185,7 +191,7 @@ func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) { } deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) && !shutdownConfig.RequestsFinished { + for time.Now().Before(deadline) && !shutdownConfig.RequestsFinished.Load() { // sleep 10 ms and then we check it again time.Sleep(10 * time.Millisecond) } diff --git a/config/graceful_shutdown_config.go b/config/graceful_shutdown_config.go index fd0bfb41a8..9a63a02ebb 100644 --- a/config/graceful_shutdown_config.go +++ b/config/graceful_shutdown_config.go @@ -21,6 +21,10 @@ import ( "time" ) +import ( + "go.uber.org/atomic" +) + import ( "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/logger" @@ -54,7 +58,7 @@ type ShutdownConfig struct { RejectRequest bool // true -> all requests had been processed. In provider side it means that all requests are returned response to clients // In consumer side, it means that all requests getting response from servers - RequestsFinished bool + RequestsFinished *atomic.Bool } // nolint diff --git a/config/graceful_shutdown_config_test.go b/config/graceful_shutdown_config_test.go index 43d2fc3c1b..494765ccb8 100644 --- a/config/graceful_shutdown_config_test.go +++ b/config/graceful_shutdown_config_test.go @@ -24,12 +24,15 @@ import ( import ( "github.com/stretchr/testify/assert" + "go.uber.org/atomic" ) func TestShutdownConfigGetTimeout(t *testing.T) { - config := ShutdownConfig{} + config := ShutdownConfig{ + RequestsFinished: &atomic.Bool{}, + } assert.False(t, config.RejectRequest) - assert.False(t, config.RequestsFinished) + assert.False(t, config.RequestsFinished.Load()) config = ShutdownConfig{ Timeout: "60s", diff --git a/config/graceful_shutdown_test.go b/config/graceful_shutdown_test.go index 870302d288..42e80d0461 100644 --- a/config/graceful_shutdown_test.go +++ b/config/graceful_shutdown_test.go @@ -21,13 +21,24 @@ import ( "testing" ) +import ( + "go.uber.org/atomic" +) + import ( "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/filter" "dubbo.apache.org/dubbo-go/v3/protocol" ) func TestGracefulShutdownInit(t *testing.T) { + extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter { + return &mockGracefulShutdownFilter{} + }) + extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter { + return &mockGracefulShutdownFilter{} + }) GracefulShutdownInit() } @@ -49,13 +60,16 @@ func TestBeforeShutdown(t *testing.T) { } // without configuration + consumerConfig = nil + providerConfig = nil BeforeShutdown() consumerConfig = &ConsumerConfig{ References: consumerReferences, ShutdownConfig: &ShutdownConfig{ - Timeout: "1", - StepTimeout: "1s", + Timeout: "1", + StepTimeout: "1s", + RequestsFinished: &atomic.Bool{}, }, } @@ -70,8 +84,9 @@ func TestBeforeShutdown(t *testing.T) { providerConfig = &ProviderConfig{ ShutdownConfig: &ShutdownConfig{ - Timeout: "1", - StepTimeout: "1s", + Timeout: "1", + StepTimeout: "1s", + RequestsFinished: &atomic.Bool{}, }, Protocols: providerProtocols, } @@ -80,8 +95,9 @@ func TestBeforeShutdown(t *testing.T) { providerConfig = &ProviderConfig{ ShutdownConfig: &ShutdownConfig{ - Timeout: "1", - StepTimeout: "-1s", + Timeout: "1", + StepTimeout: "-1s", + RequestsFinished: &atomic.Bool{}, }, Protocols: providerProtocols, } @@ -89,8 +105,9 @@ func TestBeforeShutdown(t *testing.T) { consumerConfig = &ConsumerConfig{ References: consumerReferences, ShutdownConfig: &ShutdownConfig{ - Timeout: "1", - StepTimeout: "-1s", + Timeout: "1", + StepTimeout: "-1s", + RequestsFinished: &atomic.Bool{}, }, } diff --git a/filter/filter_impl/graceful_shutdown_filter.go b/filter/filter_impl/graceful_shutdown_filter.go index 028ffd2e1e..fd0a9f2abb 100644 --- a/filter/filter_impl/graceful_shutdown_filter.go +++ b/filter/filter_impl/graceful_shutdown_filter.go @@ -32,19 +32,13 @@ import ( ) func init() { - consumerFiler := &gracefulShutdownFilter{ - shutdownConfig: config.GetConsumerConfig().ShutdownConfig, - } - providerFilter := &gracefulShutdownFilter{ - shutdownConfig: config.GetProviderConfig().ShutdownConfig, - } - + // `init()` is performed before config.Load(), so shutdownConfig will be retrieved after config was loaded. extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter { - return consumerFiler + return &gracefulShutdownFilter{} }) extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter { - return providerFilter + return &gracefulShutdownFilter{} }) } @@ -61,7 +55,7 @@ func (gf *gracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.I } atomic.AddInt32(&gf.activeCount, 1) if gf.shutdownConfig != nil && gf.activeCount > 0 { - gf.shutdownConfig.RequestsFinished = false + gf.shutdownConfig.RequestsFinished.Store(false) } return invoker.Invoke(ctx, invocation) } @@ -71,11 +65,17 @@ func (gf *gracefulShutdownFilter) OnResponse(ctx context.Context, result protoco atomic.AddInt32(&gf.activeCount, -1) // although this isn't thread safe, it won't be a problem if the gf.rejectNewRequest() is true. if gf.shutdownConfig != nil && gf.activeCount <= 0 { - gf.shutdownConfig.RequestsFinished = true + gf.shutdownConfig.RequestsFinished.Store(true) } return result } +func (gf *gracefulShutdownFilter) Set(name string, conf interface{}) { + if shutdownConfig, ok := conf.(*config.ShutdownConfig); ok && name == config.GracefulShutdownFilterShutdownConfig { + gf.shutdownConfig = shutdownConfig + } +} + func (gf *gracefulShutdownFilter) rejectNewRequest() bool { if gf.shutdownConfig == nil { return false diff --git a/filter/filter_impl/graceful_shutdown_filter_test.go b/filter/filter_impl/graceful_shutdown_filter_test.go index 55e28e4251..89d00fbf6e 100644 --- a/filter/filter_impl/graceful_shutdown_filter_test.go +++ b/filter/filter_impl/graceful_shutdown_filter_test.go @@ -25,6 +25,7 @@ import ( import ( "github.com/stretchr/testify/assert" + "go.uber.org/atomic" ) import ( @@ -59,6 +60,7 @@ func TestGenericFilterInvoke(t *testing.T) { providerConfig.ShutdownConfig = &config.ShutdownConfig{ RejectRequest: true, RejectRequestHandler: "mock", + RequestsFinished: &atomic.Bool{}, } shutdownFilter.shutdownConfig = providerConfig.ShutdownConfig @@ -70,6 +72,6 @@ func TestGenericFilterInvoke(t *testing.T) { extension.SetRejectedExecutionHandler("mock", func() filter.RejectedExecutionHandler { return rejectHandler }) - assert.True(t, providerConfig.ShutdownConfig.RequestsFinished) + assert.True(t, providerConfig.ShutdownConfig.RequestsFinished.Load()) assert.Equal(t, rejectHandler, shutdownFilter.getRejectHandler()) } diff --git a/filter/filter_impl/tps_limit_filter_test.go b/filter/filter_impl/tps_limit_filter_test.go index b70611dbc4..e9f8660890 100644 --- a/filter/filter_impl/tps_limit_filter_test.go +++ b/filter/filter_impl/tps_limit_filter_test.go @@ -101,9 +101,8 @@ func TestGenericFilterInvokeWithDefaultTpsLimiterNotAllow(t *testing.T) { attch := make(map[string]interface{}) result := tpsFilter.Invoke(context.Background(), - protocol.NewBaseInvoker( - - invokeUrl), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + protocol.NewBaseInvoker(invokeUrl), + invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) assert.Nil(t, result.Error()) assert.Nil(t, result.Result()) } diff --git a/go.sum b/go.sum index d1c240e824..09a5ce38d9 100644 --- a/go.sum +++ b/go.sum @@ -161,7 +161,6 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7 github.com/coreos/go-systemd/v22 v22.1.0 h1:kq/SbG2BCKLkDKkjQf5OWwKWUKj1lgs3lFI4PxnR5lg= github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= -github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=