diff --git a/config/config_loader_test.go b/config/config_loader_test.go index b0935a2885..59593ef20b 100644 --- a/config/config_loader_test.go +++ b/config/config_loader_test.go @@ -18,6 +18,7 @@ package config import ( + "context" "path/filepath" "sort" "sync" @@ -41,7 +42,9 @@ import ( "dubbo.apache.org/dubbo-go/v3/common/logger" "dubbo.apache.org/dubbo-go/v3/common/proxy/proxy_factory" "dubbo.apache.org/dubbo-go/v3/config_center" + "dubbo.apache.org/dubbo-go/v3/filter" "dubbo.apache.org/dubbo-go/v3/metadata/service" + "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/registry" ) @@ -74,6 +77,13 @@ func TestConfigLoader(t *testing.T) { } func TestLoad(t *testing.T) { + extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter { + return &mockGracefulShutdownFilter{} + }) + extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter { + return &mockGracefulShutdownFilter{} + }) + doInitConsumer() doInitProvider() @@ -596,3 +606,17 @@ func ConvertURLArrToIntfArr(urls []*common.URL) []interface{} { } return res } + +type mockGracefulShutdownFilter struct{} + +func (f *mockGracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + panic("implement me") +} + +func (f *mockGracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + panic("implement me") +} + +func (f *mockGracefulShutdownFilter) Set(name string, config interface{}) { + return +} diff --git a/config/config_setter.go b/config/config_setter.go new file mode 100644 index 0000000000..9f114f26d0 --- /dev/null +++ b/config/config_setter.go @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +const ( + GracefulShutdownFilterShutdownConfig = "GracefulShutdownFilterShutdownConfig" +) + +type Setter interface { + Set(name string, config interface{}) +} diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go index fd34e200be..51443c9d5a 100644 --- a/config/graceful_shutdown.go +++ b/config/graceful_shutdown.go @@ -59,6 +59,14 @@ func GracefulShutdownInit() { signal.Notify(signals, ShutdownSignals...) + // retrieve ShutdownConfig for gracefulShutdownFilter + if filter, ok := extension.GetFilter(constant.CONSUMER_SHUTDOWN_FILTER).(Setter); ok && GetConsumerConfig().ShutdownConfig != nil { + filter.Set(GracefulShutdownFilterShutdownConfig, GetConsumerConfig().ShutdownConfig) + } + if filter, ok := extension.GetFilter(constant.PROVIDER_SHUTDOWN_FILTER).(Setter); ok && GetProviderConfig().ShutdownConfig != nil { + filter.Set(GracefulShutdownFilterShutdownConfig, GetProviderConfig().ShutdownConfig) + } + go func() { select { case sig := <-signals: diff --git a/config/graceful_shutdown_test.go b/config/graceful_shutdown_test.go index 870302d288..920a268df2 100644 --- a/config/graceful_shutdown_test.go +++ b/config/graceful_shutdown_test.go @@ -24,10 +24,17 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/filter" "dubbo.apache.org/dubbo-go/v3/protocol" ) func TestGracefulShutdownInit(t *testing.T) { + extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter { + return &mockGracefulShutdownFilter{} + }) + extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter { + return &mockGracefulShutdownFilter{} + }) GracefulShutdownInit() } @@ -49,6 +56,8 @@ func TestBeforeShutdown(t *testing.T) { } // without configuration + consumerConfig = nil + providerConfig = nil BeforeShutdown() consumerConfig = &ConsumerConfig{ diff --git a/filter/filter_impl/graceful_shutdown_filter.go b/filter/filter_impl/graceful_shutdown_filter.go index 028ffd2e1e..9da238b9d5 100644 --- a/filter/filter_impl/graceful_shutdown_filter.go +++ b/filter/filter_impl/graceful_shutdown_filter.go @@ -32,19 +32,13 @@ import ( ) func init() { - consumerFiler := &gracefulShutdownFilter{ - shutdownConfig: config.GetConsumerConfig().ShutdownConfig, - } - providerFilter := &gracefulShutdownFilter{ - shutdownConfig: config.GetProviderConfig().ShutdownConfig, - } - + // `init()` is performed before config.Load(), so shutdownConfig will be retrieved after config was loaded. extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter { - return consumerFiler + return &gracefulShutdownFilter{} }) extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter { - return providerFilter + return &gracefulShutdownFilter{} }) } @@ -60,9 +54,6 @@ func (gf *gracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.I return gf.getRejectHandler().RejectedExecution(invoker.GetURL(), invocation) } atomic.AddInt32(&gf.activeCount, 1) - if gf.shutdownConfig != nil && gf.activeCount > 0 { - gf.shutdownConfig.RequestsFinished = false - } return invoker.Invoke(ctx, invocation) } @@ -70,12 +61,25 @@ func (gf *gracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.I func (gf *gracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { atomic.AddInt32(&gf.activeCount, -1) // although this isn't thread safe, it won't be a problem if the gf.rejectNewRequest() is true. - if gf.shutdownConfig != nil && gf.activeCount <= 0 { + if gf.shutdownConfig != nil && gf.shutdownConfig.RejectRequest && gf.activeCount <= 0 { gf.shutdownConfig.RequestsFinished = true } return result } +func (gf *gracefulShutdownFilter) Set(name string, conf interface{}) { + switch name { + case config.GracefulShutdownFilterShutdownConfig: + if shutdownConfig, ok := conf.(*config.ShutdownConfig); !ok { + gf.shutdownConfig = shutdownConfig + return + } + logger.Warnf("the type of config for {%s} should be *config.ShutdownConfig", config.GracefulShutdownFilterShutdownConfig) + default: + // do nothing + } +} + func (gf *gracefulShutdownFilter) rejectNewRequest() bool { if gf.shutdownConfig == nil { return false diff --git a/filter/filter_impl/tps_limit_filter_test.go b/filter/filter_impl/tps_limit_filter_test.go index b70611dbc4..e9f8660890 100644 --- a/filter/filter_impl/tps_limit_filter_test.go +++ b/filter/filter_impl/tps_limit_filter_test.go @@ -101,9 +101,8 @@ func TestGenericFilterInvokeWithDefaultTpsLimiterNotAllow(t *testing.T) { attch := make(map[string]interface{}) result := tpsFilter.Invoke(context.Background(), - protocol.NewBaseInvoker( - - invokeUrl), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) + protocol.NewBaseInvoker(invokeUrl), + invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch)) assert.Nil(t, result.Error()) assert.Nil(t, result.Result()) } diff --git a/go.sum b/go.sum index d1c240e824..09a5ce38d9 100644 --- a/go.sum +++ b/go.sum @@ -161,7 +161,6 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7 github.com/coreos/go-systemd/v22 v22.1.0 h1:kq/SbG2BCKLkDKkjQf5OWwKWUKj1lgs3lFI4PxnR5lg= github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= -github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= diff --git a/metadata/report/etcd/report.go b/metadata/report/etcd/report.go index bec551c843..836b43b329 100644 --- a/metadata/report/etcd/report.go +++ b/metadata/report/etcd/report.go @@ -58,7 +58,7 @@ func (e *etcdMetadataReport) GetAppMetadata(metadataIdentifier *identifier.Subsc if err != nil { return nil, err } - + info := &common.MetadataInfo{} return info, json.Unmarshal([]byte(data), info) } @@ -70,7 +70,7 @@ func (e *etcdMetadataReport) PublishAppMetadata(metadataIdentifier *identifier.S if err == nil { err = e.client.Put(key, string(value)) } - + return err }