diff --git a/cluster/cluster/adaptivesvc/cluster.go b/cluster/cluster/adaptivesvc/cluster.go new file mode 100644 index 0000000000..71abf865d8 --- /dev/null +++ b/cluster/cluster/adaptivesvc/cluster.go @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package adaptivesvc + +import ( + "sync" +) + +import ( + clusterpkg "dubbo.apache.org/dubbo-go/v3/cluster/cluster" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/protocol" +) + +var ( + once sync.Once + instance clusterpkg.Cluster +) + +func init() { + extension.SetCluster(constant.ClusterKeyAdaptiveService, newAdaptiveServiceCluster) +} + +// adaptiveServiceCluster is a cluster for adaptive service. +type adaptiveServiceCluster struct{} + +func newAdaptiveServiceCluster() clusterpkg.Cluster { + if instance == nil { + once.Do(func() { + instance = &adaptiveServiceCluster{} + }) + } + return instance +} + +func (c *adaptiveServiceCluster) Join(directory directory.Directory) protocol.Invoker { + return clusterpkg.BuildInterceptorChain(newAdaptiveServiceClusterInvoker(directory)) +} diff --git a/cluster/cluster/adaptivesvc/cluster_invoker.go b/cluster/cluster/adaptivesvc/cluster_invoker.go new file mode 100644 index 0000000000..90384ba803 --- /dev/null +++ b/cluster/cluster/adaptivesvc/cluster_invoker.go @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package adaptivesvc + +import ( + "context" + "strconv" +) + +import ( + perrors "github.com/pkg/errors" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/cluster/cluster/base" + "dubbo.apache.org/dubbo-go/v3/cluster/directory" + "dubbo.apache.org/dubbo-go/v3/cluster/metrics" + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/common/logger" + "dubbo.apache.org/dubbo-go/v3/protocol" +) + +type adaptiveServiceClusterInvoker struct { + base.ClusterInvoker +} + +func newAdaptiveServiceClusterInvoker(directory directory.Directory) protocol.Invoker { + return &adaptiveServiceClusterInvoker{ + ClusterInvoker: base.NewClusterInvoker(directory), + } +} + +func (ivk *adaptiveServiceClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + invokers := ivk.Directory.List(invocation) + if err := ivk.CheckInvokers(invokers, invocation); err != nil { + return protocol.NewRPCResult(nil, err) + } + + // get loadBalance + lbKey := invokers[0].GetURL().GetParam(constant.LoadbalanceKey, constant.LoadBalanceKeyP2C) + if lbKey != constant.LoadBalanceKeyP2C { + return protocol.NewRPCResult(nil, perrors.Errorf("adaptive service not supports %s load balance", lbKey)) + } + lb := extension.GetLoadbalance(lbKey) + + // select a node by the loadBalance + invoker := lb.Select(invokers, invocation) + + // invoke + result := invoker.Invoke(ctx, invocation) + + // update metrics + remainingStr := result.Attachment(constant.AdaptiveServiceRemainingKey, "").(string) + remaining, err := strconv.Atoi(remainingStr) + if err != nil { + logger.Warnf("the remaining is unexpected, we need a int type, but we got %s, err: %v.", remainingStr, err) + return result + } + logger.Debugf("[adasvc cluster] The server status was received successfully, %s: %#v", + constant.AdaptiveServiceRemainingKey, remainingStr) + err = metrics.LocalMetrics.SetMethodMetrics(invoker.GetURL(), + invocation.MethodName(), metrics.HillClimbing, uint64(remaining)) + if err != nil { + logger.Warnf("adaptive service metrics update is failed, err: %v", err) + return protocol.NewRPCResult(nil, err) + } + + return result +} diff --git a/cluster/cluster/available/cluster_invoker.go b/cluster/cluster/available/cluster_invoker.go index fc6dea3170..1a20a3fdb3 100644 --- a/cluster/cluster/available/cluster_invoker.go +++ b/cluster/cluster/available/cluster_invoker.go @@ -47,12 +47,12 @@ func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.I invokers := invoker.Directory.List(invocation) err := invoker.CheckInvokers(invokers, invocation) if err != nil { - return &protocol.RPCResult{Err: err} + return protocol.NewRPCResult(nil, err) } err = invoker.CheckWhetherDestroyed() if err != nil { - return &protocol.RPCResult{Err: err} + return protocol.NewRPCResult(nil, err) } for _, ivk := range invokers { @@ -60,5 +60,5 @@ func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.I return ivk.Invoke(ctx, invocation) } } - return &protocol.RPCResult{Err: errors.New(fmt.Sprintf("no provider available in %v", invokers))} + return protocol.NewRPCResult(nil, errors.New(fmt.Sprintf("no provider available in %v", invokers))) } diff --git a/cluster/cluster/available/cluster_invoker_test.go b/cluster/cluster/available/cluster_invoker_test.go index ca60a8d792..36dcda7189 100644 --- a/cluster/cluster/available/cluster_invoker_test.go +++ b/cluster/cluster/available/cluster_invoker_test.go @@ -65,7 +65,7 @@ func TestAvailableClusterInvokerSuccess(t *testing.T) { invoker := mock.NewMockInvoker(ctrl) clusterInvoker := registerAvailable(invoker) - mockResult := &protocol.RPCResult{Rest: clusterpkg.Rest{Tried: 0, Success: true}} + mockResult := protocol.NewRPCResult(clusterpkg.Rest{Tried: 0, Success: true}, nil) invoker.EXPECT().IsAvailable().Return(true) invoker.EXPECT().Invoke(gomock.Any()).Return(mockResult) diff --git a/cluster/cluster_impl/import.go b/cluster/cluster_impl/import.go index 68908ab6e2..8ba974918a 100644 --- a/cluster/cluster_impl/import.go +++ b/cluster/cluster_impl/import.go @@ -21,6 +21,7 @@ package cluster_impl // This package may be DEPRECATED OR REMOVED in the future. import ( + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/adaptivesvc" _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/available" _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/broadcast" _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/failback" diff --git a/cluster/loadbalance/p2c/loadbalance.go b/cluster/loadbalance/p2c/loadbalance.go new file mode 100644 index 0000000000..3b3678aaeb --- /dev/null +++ b/cluster/loadbalance/p2c/loadbalance.go @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package p2c + +import ( + "errors" + "fmt" + "math/rand" + "sync" + "time" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance" + "dubbo.apache.org/dubbo-go/v3/cluster/metrics" + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/common/logger" + "dubbo.apache.org/dubbo-go/v3/protocol" +) + +func init() { + extension.SetLoadbalance(constant.LoadBalanceKeyP2C, newLoadBalance) +} + +var ( + once sync.Once + instance loadbalance.LoadBalance +) + +type loadBalance struct{} + +func newLoadBalance() loadbalance.LoadBalance { + if instance == nil { + once.Do(func() { + instance = &loadBalance{} + }) + } + return instance +} + +func (l *loadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker { + if len(invokers) == 0 { + return nil + } + if len(invokers) == 1 { + return invokers[0] + } + // m is the Metrics, which saves the metrics of instance, invokers and methods + // The local metrics is available only for the earlier version. + m := metrics.LocalMetrics + // picks two nodes randomly + var i, j int + if len(invokers) == 2 { + i, j = 0, 1 + } else { + rand.Seed(time.Now().Unix()) + i = rand.Intn(len(invokers)) + j = i + for i == j { + j = rand.Intn(len(invokers)) + } + } + logger.Debugf("[P2C select] Two invokers were selected, i: %d, j: %d, invoker[i]: %s, invoker[j]: %s.", + i, j, invokers[i], invokers[j]) + + // TODO(justxuewei): please consider how to get the real method name from $invoke, + // see also [#1511](https://github.com/apache/dubbo-go/issues/1511) + methodName := invocation.MethodName() + // remainingIIface, remainingJIface means remaining capacity of node i and node j. + // If one of the metrics is empty, invoke the invocation to that node directly. + remainingIIface, err := m.GetMethodMetrics(invokers[i].GetURL(), methodName, metrics.HillClimbing) + if err != nil { + if errors.Is(err, metrics.ErrMetricsNotFound) { + logger.Debugf("[P2C select] The invoker[i] was selected, because it hasn't been selected before.") + return invokers[i] + } + logger.Warnf("get method metrics err: %v", err) + return nil + } + + remainingJIface, err := m.GetMethodMetrics(invokers[j].GetURL(), methodName, metrics.HillClimbing) + if err != nil { + if errors.Is(err, metrics.ErrMetricsNotFound) { + logger.Debugf("[P2C select] The invoker[j] was selected, because it hasn't been selected before.") + return invokers[j] + } + logger.Warnf("get method metrics err: %v", err) + return nil + } + + // Convert interface to int, if the type is unexpected, panic immediately + remainingI, ok := remainingIIface.(uint64) + if !ok { + panic(fmt.Sprintf("[P2C select] the type of %s expects to be uint64, but gets %T", + metrics.HillClimbing, remainingIIface)) + } + + remainingJ, ok := remainingJIface.(uint64) + if !ok { + panic(fmt.Sprintf("the type of %s expects to be uint64, but gets %T", metrics.HillClimbing, remainingJIface)) + } + + logger.Debugf("[P2C select] The invoker[i] remaining is %d, and the invoker[j] is %d.", remainingI, remainingJ) + + // For the remaining capacity, the bigger, the better. + if remainingI > remainingJ { + logger.Debugf("[P2C select] The invoker[i] was selected.") + return invokers[i] + } + + logger.Debugf("[P2C select] The invoker[j] was selected.") + return invokers[j] +} diff --git a/cluster/metrics/constants.go b/cluster/metrics/constants.go new file mode 100644 index 0000000000..2bc6b4fea2 --- /dev/null +++ b/cluster/metrics/constants.go @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metrics + +const ( + HillClimbing = "hill-climbing" +) diff --git a/cluster/metrics/local_metrics.go b/cluster/metrics/local_metrics.go new file mode 100644 index 0000000000..e9f976a193 --- /dev/null +++ b/cluster/metrics/local_metrics.go @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metrics + +import ( + "fmt" + "sync" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common" +) + +var LocalMetrics Metrics + +func init() { + LocalMetrics = newLocalMetrics() +} + +type localMetrics struct { + // protect metrics + lock *sync.RWMutex + metrics map[string]interface{} +} + +func newLocalMetrics() *localMetrics { + return &localMetrics{ + lock: new(sync.RWMutex), + metrics: make(map[string]interface{}), + } +} + +func (m *localMetrics) GetMethodMetrics(url *common.URL, methodName, key string) (interface{}, error) { + m.lock.RLock() + defer m.lock.RUnlock() + metricsKey := fmt.Sprintf("%s.%s.%s.%s", getInstanceKey(url), getInvokerKey(url), methodName, key) + if metrics, ok := m.metrics[metricsKey]; ok { + return metrics, nil + } + return nil, ErrMetricsNotFound +} + +func (m *localMetrics) SetMethodMetrics(url *common.URL, methodName, key string, value interface{}) error { + m.lock.Lock() + defer m.lock.Unlock() + metricsKey := fmt.Sprintf("%s.%s.%s.%s", getInstanceKey(url), getInvokerKey(url), methodName, key) + m.metrics[metricsKey] = value + return nil +} + +func (m *localMetrics) GetInvokerMetrics(url *common.URL, key string) (interface{}, error) { + panic("implement me") +} + +func (m *localMetrics) SetInvokerMetrics(url *common.URL, key string, value interface{}) error { + panic("implement me") +} + +func (m *localMetrics) GetInstanceMetrics(url *common.URL, key string) (interface{}, error) { + panic("implement me") +} + +func (m *localMetrics) SetInstanceMetrics(url *common.URL, key string, value interface{}) error { + panic("implement me") +} diff --git a/cluster/metrics/metrics.go b/cluster/metrics/metrics.go new file mode 100644 index 0000000000..0f61dcf1c2 --- /dev/null +++ b/cluster/metrics/metrics.go @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metrics + +import ( + "github.com/pkg/errors" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common" +) + +var ( + ErrMetricsNotFound = errors.New("metrics not found") +) + +type Metrics interface { + // GetMethodMetrics returns method-level metrics, the format of key is "{instance key}.{invoker key}.{method key}.{key}" + // url is invoker's url, which contains information about instance and invoker. + // methodName is the method name. + // key is the key of the metrics. + GetMethodMetrics(url *common.URL, methodName, key string) (interface{}, error) + SetMethodMetrics(url *common.URL, methodName, key string, value interface{}) error + + // GetInvokerMetrics returns invoker-level metrics, the format of key is "{instance key}.{invoker key}.{key}" + // DO NOT IMPLEMENT FOR EARLIER VERSION + GetInvokerMetrics(url *common.URL, key string) (interface{}, error) + SetInvokerMetrics(url *common.URL, key string, value interface{}) error + + // GetInstanceMetrics returns instance-level metrics, the format of key is "{instance key}.{key}" + // DO NOT IMPLEMENT FOR EARLIER VERSION + GetInstanceMetrics(url *common.URL, key string) (interface{}, error) + SetInstanceMetrics(url *common.URL, key string, value interface{}) error +} diff --git a/cluster/metrics/utils.go b/cluster/metrics/utils.go new file mode 100644 index 0000000000..65cc9e249d --- /dev/null +++ b/cluster/metrics/utils.go @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metrics + +import ( + "fmt" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common" +) + +func getInvokerKey(url *common.URL) string { + return url.Path +} + +func getInstanceKey(url *common.URL) string { + return fmt.Sprintf("%s:%s", url.Ip, url.Port) +} diff --git a/common/constant/cluster.go b/common/constant/cluster.go index 14f3375317..81af3ec882 100644 --- a/common/constant/cluster.go +++ b/common/constant/cluster.go @@ -18,12 +18,13 @@ package constant const ( - ClusterKeyAvailable = "available" - ClusterKeyBroadcast = "broadcast" - ClusterKeyFailback = "failback" - ClusterKeyFailfast = "failfast" - ClusterKeyFailover = "failover" - ClusterKeyFailsafe = "failsafe" - ClusterKeyForking = "forking" - ClusterKeyZoneAware = "zoneAware" + ClusterKeyAvailable = "available" + ClusterKeyBroadcast = "broadcast" + ClusterKeyFailback = "failback" + ClusterKeyFailfast = "failfast" + ClusterKeyFailover = "failover" + ClusterKeyFailsafe = "failsafe" + ClusterKeyForking = "forking" + ClusterKeyZoneAware = "zoneAware" + ClusterKeyAdaptiveService = "adaptiveService" ) diff --git a/common/constant/default.go b/common/constant/default.go index b0c3874932..2bd96e9905 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -48,13 +48,21 @@ const ( ) const ( - DefaultKey = "default" - PrefixDefaultKey = "default." - DefaultServiceFilters = EchoFilterKey + "," + MetricsFilterKey + "," + TokenFilterKey + "," + AccessLogFilterKey + "," + TpsLimitFilterKey + "," + GenericServiceFilterKey + "," + ExecuteLimitFilterKey + "," + GracefulShutdownProviderFilterKey + DefaultKey = "default" + Generic = "$invoke" + Echo = "$echo" +) + +// default filters +const ( + // DefaultServiceFilters defines default service filters, it is highly recommended + // that put the AdaptiveServiceProviderFilterKey at the end. + DefaultServiceFilters = EchoFilterKey + "," + + MetricsFilterKey + "," + TokenFilterKey + "," + AccessLogFilterKey + "," + TpsLimitFilterKey + "," + + GenericServiceFilterKey + "," + ExecuteLimitFilterKey + "," + GracefulShutdownProviderFilterKey + "," + + AdaptiveServiceProviderFilterKey + DefaultReferenceFilters = GracefulShutdownConsumerFilterKey - GenericReferenceFilters = GenericFilterKey - Generic = "$invoke" - Echo = "$echo" ) const ( diff --git a/common/constant/key.go b/common/constant/key.go index 4c55383d03..067ccf03bc 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -65,6 +65,7 @@ const ( const ( AccessLogFilterKey = "accesslog" ActiveFilterKey = "active" + AdaptiveServiceProviderFilterKey = "padasvc" AuthConsumerFilterKey = "sign" AuthProviderFilterKey = "auth" EchoFilterKey = "echo" @@ -354,10 +355,19 @@ const ( ) // Generic Filter - const ( GenericSerializationDefault = "true" // disable "protobuf-json" temporarily //GenericSerializationProtobuf = "protobuf-json" GenericSerializationGson = "gson" ) + +// AdaptiveService Filter +// goland:noinspection ALL +const ( + // attribute keys + AdaptiveServiceUpdaterKey = "adaptive-service.updater" + // attachment keys + AdaptiveServiceRemainingKey = "adaptive-service.remaining" + AdaptiveServiceInflightKey = "adaptive-service.inflight" +) diff --git a/common/constant/loadbalance.go b/common/constant/loadbalance.go index 7b1d8ea195..82c5771f9a 100644 --- a/common/constant/loadbalance.go +++ b/common/constant/loadbalance.go @@ -22,4 +22,5 @@ const ( LoadBalanceKeyLeastActive = "leastactive" LoadBalanceKeyRandom = "random" LoadBalanceKeyRoundRobin = "roundrobin" + LoadBalanceKeyP2C = "p2c" ) diff --git a/common/proxy/proxy.go b/common/proxy/proxy.go index c32eedc57c..1676e75fb4 100644 --- a/common/proxy/proxy.go +++ b/common/proxy/proxy.go @@ -204,12 +204,12 @@ func DefaultProxyImplementFunc(p *Proxy, v common.RPCService) { err = perrors.Cause(err) // if some error happened, it should be log some info in the separate file. if throwabler, ok := err.(java_exception.Throwabler); ok { - logger.Warnf("invoke service throw exception: %v , stackTraceElements: %v", err.Error(), throwabler.GetStackTrace()) + logger.Warnf("[CallProxy] invoke service throw exception: %v , stackTraceElements: %v", err.Error(), throwabler.GetStackTrace()) } else { - logger.Warnf("result err: %v", err) + logger.Warnf("[CallProxy] received rpc err: %v", err) } } else { - logger.Debugf("[makeDubboCallProxy] result: %v, err: %v", result.Result(), err) + logger.Debugf("[CallProxy] received rpc result successfully: %s", result) } if len(outs) == 1 { return []reflect.Value{reflect.ValueOf(&err).Elem()} diff --git a/config/consumer_config.go b/config/consumer_config.go index 2396aabf09..f9344f32cb 100644 --- a/config/consumer_config.go +++ b/config/consumer_config.go @@ -46,6 +46,8 @@ type ConsumerConfig struct { RequestTimeout string `default:"3s" yaml:"request-timeout" json:"request-timeout,omitempty" property:"request-timeout"` ProxyFactory string `default:"default" yaml:"proxy" json:"proxy,omitempty" property:"proxy"` Check bool `yaml:"check" json:"check,omitempty" property:"check"` + // adaptive service + AdaptiveService bool `default:"false" yaml:"adaptive-service" json:"adaptive-service" property:"adaptive-service"` References map[string]*ReferenceConfig `yaml:"references" json:"references,omitempty" property:"references"` TracingKey string `yaml:"tracing-key" json:"tracing-key" property:"tracing-key"` diff --git a/config/provider_config.go b/config/provider_config.go index 6133791d1c..bd7cb71707 100644 --- a/config/provider_config.go +++ b/config/provider_config.go @@ -31,6 +31,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/logger" + aslimiter "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc/limiter" ) // ProviderConfig is the default configuration of service provider @@ -43,14 +44,13 @@ type ProviderConfig struct { // TracingKey is tracing ids list TracingKey string `yaml:"tracing-key" json:"tracing-key" property:"tracing-key"` // Services services - Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"` - - ProxyFactory string `default:"default" yaml:"proxy" json:"proxy,omitempty" property:"proxy"` - - FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf"` - ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"` - - rootConfig *RootConfig + Services map[string]*ServiceConfig `yaml:"services" json:"services,omitempty" property:"services"` + ProxyFactory string `default:"default" yaml:"proxy" json:"proxy,omitempty" property:"proxy"` + FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf"` + ConfigType map[string]string `yaml:"config_type" json:"config_type,omitempty" property:"config_type"` + // adaptive service + AdaptiveServiceVerbose bool `default:"false" yaml:"adaptive-service-verbose" json:"adaptive-service-verbose" property:"adaptive-service-verbose"` + rootConfig *RootConfig } func (ProviderConfig) Prefix() string { @@ -115,6 +115,12 @@ func (c *ProviderConfig) Init(rc *RootConfig) error { if err := c.check(); err != nil { return err } + // enable adaptive service verbose + if c.AdaptiveServiceVerbose { + logger.Infof("adaptive service verbose is enabled.") + logger.Debugf("debug-level info could be shown.") + aslimiter.Verbose = true + } return nil } diff --git a/config/reference_config.go b/config/reference_config.go index 9c7402460f..1717e92c5a 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -73,7 +73,6 @@ type ReferenceConfig struct { metaDataType string } -// nolint func (rc *ReferenceConfig) Prefix() string { return constant.ReferenceConfigPrefix + rc.InterfaceName + "." } @@ -107,8 +106,16 @@ func (rc *ReferenceConfig) Init(root *RootConfig) error { return verify(rc) } -// Refer ... +// Refer retrieves invokers from urls. func (rc *ReferenceConfig) Refer(srv interface{}) { + // If adaptive service is enabled, + // the cluster and load balance should be overridden to "adaptivesvc" and "p2c" respectively. + if rc.rootConfig.Consumer.AdaptiveService { + rc.Cluster = constant.ClusterKeyAdaptiveService + rc.Loadbalance = constant.LoadBalanceKeyP2C + } + + // cfgURL is an interface-level invoker url, in the other words, it represents an interface. cfgURL := common.NewURLWithOptions( common.WithPath(rc.InterfaceName), common.WithProtocol(rc.Protocol), @@ -116,6 +123,7 @@ func (rc *ReferenceConfig) Refer(srv interface{}) { common.WithParamsValue(constant.BeanNameKey, rc.id), common.WithParamsValue(constant.MetadataTypeKey, rc.metaDataType), ) + SetConsumerServiceByInterfaceName(rc.InterfaceName, srv) if rc.ForceTag { cfgURL.AddParam(constant.ForceUseTag, "true") @@ -125,10 +133,15 @@ func (rc *ReferenceConfig) Refer(srv interface{}) { // retrieving urls from config, and appending the urls to rc.urls if rc.URL != "" { // use user-specific urls /* - Two types of URL are allowed for rc.URL: direct url and registry url, they will be handled in different ways. - For example, "tri://localhost:10000" is a direct url, and "registry://localhost:2181" is a registry url. - rc.URL: "tri://localhost:10000;tri://localhost:10001;registry://localhost:2181", - urlStrings = []string{"tri://localhost:10000", "tri://localhost:10001", "registry://localhost:2181"}. + Two types of URL are allowed for rc.URL: + 1. direct url: server IP, that is, no need for a registry anymore + 2. registry url + They will be handled in different ways: + For example, we have a direct url and a registry url: + 1. "tri://localhost:10000" is a direct url + 2. "registry://localhost:2181" is a registry url. + Then, rc.URL looks like a string separated by semicolon: "tri://localhost:10000;registry://localhost:2181". + The result of urlStrings is a string array: []string{"tri://localhost:10000", "registry://localhost:2181"}. */ urlStrings := gxstrings.RegSplit(rc.URL, "\\s*[;]+\\s*") for _, urlStr := range urlStrings { @@ -136,14 +149,15 @@ func (rc *ReferenceConfig) Refer(srv interface{}) { if err != nil { panic(fmt.Sprintf("url configuration error, please check your configuration, user specified URL %v refer error, error message is %v ", urlStr, err.Error())) } - if serviceURL.Protocol == constant.RegistryProtocol { // URL stands for a registry protocol + if serviceURL.Protocol == constant.RegistryProtocol { // serviceURL in this branch is a registry protocol serviceURL.SubURL = cfgURL rc.urls = append(rc.urls, serviceURL) - } else { // URL stands for a direct address + } else { // serviceURL in this branch is the target endpoint IP address if serviceURL.Path == "" { serviceURL.Path = "/" + rc.InterfaceName } - // merge URL param with cfgURL, others are same as serviceURL + // replace params of serviceURL with params of cfgUrl + // other stuff, e.g. IP, port, etc., are same as serviceURL newURL := common.MergeURL(serviceURL, cfgURL) rc.urls = append(rc.urls, newURL) } @@ -272,7 +286,7 @@ func (rc *ReferenceConfig) getURLMap() url.Values { // filter defaultReferenceFilter := constant.DefaultReferenceFilters if rc.Generic != "" { - defaultReferenceFilter = constant.GenericReferenceFilters + "," + defaultReferenceFilter + defaultReferenceFilter = constant.GenericFilterKey + "," + defaultReferenceFilter } urlMap.Set(constant.ReferenceFilterKey, mergeValue(rc.rootConfig.Consumer.Filter, "", defaultReferenceFilter)) diff --git a/filter/adaptivesvc/filter.go b/filter/adaptivesvc/filter.go new file mode 100644 index 0000000000..04dde942ba --- /dev/null +++ b/filter/adaptivesvc/filter.go @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package adaptivesvc + +import ( + "context" + "fmt" + "sync" +) + +import ( + "github.com/pkg/errors" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common/constant" + "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/common/logger" + "dubbo.apache.org/dubbo-go/v3/filter" + "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc/limiter" + "dubbo.apache.org/dubbo-go/v3/protocol" +) + +var ( + adaptiveServiceProviderFilterOnce sync.Once + instance filter.Filter + + ErrUpdaterNotFound = fmt.Errorf("updater not found") + ErrUnexpectedUpdaterType = fmt.Errorf("unexpected updater type") +) + +func init() { + extension.SetFilter(constant.AdaptiveServiceProviderFilterKey, newAdaptiveServiceProviderFilter) +} + +// adaptiveServiceProviderFilter is for adaptive service on the provider side. +type adaptiveServiceProviderFilter struct{} + +func newAdaptiveServiceProviderFilter() filter.Filter { + if instance == nil { + adaptiveServiceProviderFilterOnce.Do(func() { + instance = &adaptiveServiceProviderFilter{} + }) + } + return instance +} + +func (f *adaptiveServiceProviderFilter) Invoke(ctx context.Context, invoker protocol.Invoker, + invocation protocol.Invocation) protocol.Result { + + l, err := limiterMapperSingleton.getMethodLimiter(invoker.GetURL(), invocation.MethodName()) + if err != nil { + if errors.Is(err, ErrLimiterNotFoundOnMapper) { + // limiter is not found on the mapper, just create + // a new limiter + if l, err = limiterMapperSingleton.newAndSetMethodLimiter(invoker.GetURL(), + invocation.MethodName(), limiter.HillClimbingLimiter); err != nil { + return &protocol.RPCResult{Err: err} + } + } else { + // unexpected errors + return &protocol.RPCResult{Err: err} + } + } + + updater, err := l.Acquire() + if err != nil { + return &protocol.RPCResult{Err: err} + } + + invocation.Attributes()[constant.AdaptiveServiceUpdaterKey] = updater + + return invoker.Invoke(ctx, invocation) +} + +func (f *adaptiveServiceProviderFilter) OnResponse(_ context.Context, result protocol.Result, invoker protocol.Invoker, + invocation protocol.Invocation) protocol.Result { + // get updater from the attributes + updaterIface := invocation.AttributeByKey(constant.AdaptiveServiceUpdaterKey, nil) + if updaterIface == nil { + return &protocol.RPCResult{Err: ErrUpdaterNotFound} + } + updater, ok := updaterIface.(limiter.Updater) + if !ok { + return &protocol.RPCResult{Err: ErrUnexpectedUpdaterType} + } + + err := updater.DoUpdate() + if err != nil { + // DoUpdate was failed, but the invocation is not failed. + // Printing the error to logs is better than returning a + // result with an error. + logger.Errorf("[adasvc filter] The DoUpdate method was failed, err: %s.", err) + } + + // get limiter for the mapper + l, err := limiterMapperSingleton.getMethodLimiter(invoker.GetURL(), invocation.MethodName()) + if err != nil { + return &protocol.RPCResult{Err: err} + } + + // set attachments to inform consumer of provider status + result.AddAttachment(constant.AdaptiveServiceRemainingKey, fmt.Sprintf("%d", l.Remaining())) + result.AddAttachment(constant.AdaptiveServiceInflightKey, fmt.Sprintf("%d", l.Inflight())) + logger.Debugf("[adasvc filter] The attachments are set, %s: %d, %s: %d.", + constant.AdaptiveServiceRemainingKey, l.Remaining(), + constant.AdaptiveServiceInflightKey, l.Inflight()) + + return result +} diff --git a/filter/adaptivesvc/limiter/hill_climbing.go b/filter/adaptivesvc/limiter/hill_climbing.go new file mode 100644 index 0000000000..070a6dc735 --- /dev/null +++ b/filter/adaptivesvc/limiter/hill_climbing.go @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package limiter + +import ( + "math" + "sync" + "time" +) + +import ( + "go.uber.org/atomic" +) + +var ( + _ Limiter = (*HillClimbing)(nil) + _ Updater = (*HillClimbingUpdater)(nil) +) + +type HillClimbingOption int64 + +const ( + HillClimbingOptionShrinkPlus HillClimbingOption = -2 + HillClimbingOptionShrink HillClimbingOption = -1 + HillClimbingOptionDoNothing HillClimbingOption = 0 + HillClimbingOptionExtend HillClimbingOption = 1 + HillClimbingOptionExtendPlus HillClimbingOption = 2 +) + +var ( + initialLimitation uint64 = 50 + maxLimitation uint64 = 500 + radicalPeriod uint64 = 1000 + stablePeriod uint64 = 32000 +) + +// HillClimbing is a limiter using HillClimbing algorithm +type HillClimbing struct { + seq *atomic.Uint64 + round *atomic.Uint64 + + inflight *atomic.Uint64 + limitation *atomic.Uint64 + + mutex *sync.Mutex + // nextUpdateTime = lastUpdatedTime + updateInterval + updateInterval *atomic.Uint64 + lastUpdatedTime *atomic.Time + + // indicators of the current round + successCounter *atomic.Uint64 + rttAvg *atomic.Float64 + + // indicators of history + bestConcurrency *atomic.Uint64 + bestRTTAvg *atomic.Float64 + bestLimitation *atomic.Uint64 + bestSuccessRate *atomic.Uint64 +} + +func NewHillClimbing() Limiter { + l := &HillClimbing{ + seq: new(atomic.Uint64), + round: new(atomic.Uint64), + inflight: new(atomic.Uint64), + limitation: atomic.NewUint64(initialLimitation), + mutex: new(sync.Mutex), + updateInterval: atomic.NewUint64(radicalPeriod), + lastUpdatedTime: atomic.NewTime(time.Now()), + successCounter: new(atomic.Uint64), + rttAvg: new(atomic.Float64), + bestConcurrency: new(atomic.Uint64), + bestRTTAvg: new(atomic.Float64), + bestLimitation: new(atomic.Uint64), + bestSuccessRate: new(atomic.Uint64), + } + + return l +} + +func (l *HillClimbing) Inflight() uint64 { + return l.inflight.Load() +} + +func (l *HillClimbing) Remaining() uint64 { + limitation := l.limitation.Load() + inflight := l.Inflight() + if limitation < inflight { + return 0 + } + return limitation - inflight +} + +func (l *HillClimbing) Acquire() (Updater, error) { + if l.Remaining() == 0 { + return nil, ErrReachLimitation + } + return NewHillClimbingUpdater(l), nil +} + +type HillClimbingUpdater struct { + startTime time.Time + limiter *HillClimbing + + // for debug purposes + seq uint64 +} + +func NewHillClimbingUpdater(limiter *HillClimbing) *HillClimbingUpdater { + inflight := limiter.inflight.Add(1) + u := &HillClimbingUpdater{ + startTime: time.Now(), + limiter: limiter, + seq: limiter.seq.Add(1) - 1, + } + VerboseDebugf("[NewHillClimbingUpdater] A new request arrived, seq: %d, inflight: %d, time: %s.", + u.seq, inflight, u.startTime.String()) + return u +} + +func (u *HillClimbingUpdater) DoUpdate() error { + defer func() { + u.limiter.inflight.Dec() + }() + VerboseDebugf("[HillClimbingUpdater] A request finished, the limiter will be updated, seq: %d.", u.seq) + + rtt := uint64(time.Now().Sub(u.startTime)) + inflight := u.limiter.Inflight() + + option, err := u.getOption(rtt, inflight) + if err != nil { + return err + } + if err = u.adjustLimitation(option); err != nil { + return err + } + return nil +} + +func (u *HillClimbingUpdater) getOption(rtt, _ uint64) (HillClimbingOption, error) { + u.limiter.mutex.Lock() + defer u.limiter.mutex.Unlock() + + now := time.Now() + option := HillClimbingOptionDoNothing + + lastUpdatedTime := u.limiter.lastUpdatedTime.Load() + updateInterval := u.limiter.updateInterval.Load() + rttAvg := u.limiter.rttAvg.Load() + successCounter := u.limiter.successCounter.Load() + limitation := u.limiter.limitation.Load() + + if now.Sub(lastUpdatedTime) > time.Duration(updateInterval) || + rttAvg == 0 { + // Current req is at the next round or no rttAvg. + + // FIXME(justxuewei): If all requests in one round + // not receive responses, rttAvg will be 0, and + // concurrency will be 0 as well, the actual + // concurrency, however, is not 0. + concurrency := float64(successCounter) * rttAvg / float64(updateInterval) + + // Consider extending limitation if concurrent is + // about to reach the limitation. + if uint64(concurrency*1.5) > limitation { + if updateInterval == radicalPeriod { + option = HillClimbingOptionExtendPlus + } else { + option = HillClimbingOptionExtend + } + } + + successRate := uint64(1000.0 * float64(successCounter) / float64(updateInterval)) + + if successRate > u.limiter.bestSuccessRate.Load() { + // successRate is the best in the history, update + // all best-indicators. + u.limiter.bestSuccessRate.Store(successRate) + u.limiter.bestRTTAvg.Store(rttAvg) + u.limiter.bestConcurrency.Store(uint64(concurrency)) + u.limiter.bestLimitation.Store(u.limiter.limitation.Load()) + VerboseDebugf("[HillClimbingUpdater] Best-indicators are up-to-date, "+ + "seq: %d, bestSuccessRate: %d, bestRTTAvg: %.4f, bestConcurrency: %d,"+ + " bestLimitation: %d.", u.seq, u.limiter.bestSuccessRate.Load(), + u.limiter.bestRTTAvg.Load(), u.limiter.bestConcurrency.Load(), + u.limiter.bestLimitation.Load()) + } else { + if u.shouldShrink(successCounter, uint64(concurrency), successRate, rttAvg) { + if updateInterval == radicalPeriod { + option = HillClimbingOptionShrinkPlus + } else { + option = HillClimbingOptionShrink + } + // shrinking limitation means the process of adjusting + // limitation goes to stable, so extends the update + // interval to avoid adjusting frequently. + u.limiter.updateInterval.Store(minUint64(updateInterval*2, stablePeriod)) + } + } + + // reset indicators for the new round + u.limiter.successCounter.Store(0) + u.limiter.rttAvg.Store(float64(rtt)) + u.limiter.lastUpdatedTime.Store(time.Now()) + VerboseDebugf("[HillClimbingUpdater] A new round is applied, all indicators are reset.") + } else { + // still in the current round + + u.limiter.successCounter.Add(1) + // ra = (ra * c + r) / (c + 1), where ra denotes rttAvg, + // c denotes successCounter, r denotes rtt. + u.limiter.rttAvg.Store((rttAvg*float64(successCounter) + float64(rtt)) / float64(successCounter+1)) + option = HillClimbingOptionDoNothing + } + + return option, nil +} + +func (u *HillClimbingUpdater) shouldShrink(counter, concurrency, successRate uint64, rttAvg float64) bool { + bestSuccessRate := u.limiter.bestSuccessRate.Load() + bestRTTAvg := u.limiter.bestRTTAvg.Load() + + diff := bestSuccessRate - successRate + diffPct := uint64(100.0 * float64(successRate) / float64(bestSuccessRate)) + + if diff <= 300 && diffPct <= 10 { + // diff is acceptable, shouldn't shrink + return false + } + + if concurrency > bestSuccessRate || rttAvg > bestRTTAvg { + // The unacceptable diff dues to too large + // concurrency or rttAvg. + concDiff := concurrency - bestSuccessRate + concDiffPct := uint64(100.0 * float64(concurrency) / float64(bestSuccessRate)) + rttAvgDiff := rttAvg - bestRTTAvg + rttAvgPctDiff := uint64(100.0 * rttAvg / bestRTTAvg) + + // TODO(justxuewei): Hard-coding here is not proper, but + // it should refactor after testing. + var ( + rttAvgDiffThreshold uint64 + rttAvgPctDiffThreshold uint64 + ) + if bestRTTAvg < 5 { + rttAvgDiffThreshold = 3 + rttAvgPctDiffThreshold = 80 + } else if bestRTTAvg < 10 { + rttAvgDiffThreshold = 2 + rttAvgPctDiffThreshold = 30 + } else if bestRTTAvg < 50 { + rttAvgDiffThreshold = 5 + rttAvgPctDiffThreshold = 20 + } else if bestRTTAvg < 100 { + rttAvgDiffThreshold = 10 + rttAvgPctDiffThreshold = 10 + } else { + rttAvgDiffThreshold = 20 + rttAvgPctDiffThreshold = 5 + } + + return (concDiffPct > 10 && concDiff > 5) && (uint64(rttAvgDiff) > rttAvgDiffThreshold || rttAvgPctDiff >= rttAvgPctDiffThreshold) + } + + return false +} + +func (u *HillClimbingUpdater) adjustLimitation(option HillClimbingOption) error { + limitation := float64(u.limiter.limitation.Load()) + oldLimitation := limitation + bestLimitation := float64(u.limiter.bestLimitation.Load()) + alpha := 1.5 * math.Log(limitation) + beta := 0.8 * math.Log(limitation) + logUpdateInterval := math.Log2(float64(u.limiter.updateInterval.Load()) / 1000.0) + + switch option { + case HillClimbingOptionExtendPlus: + limitation += alpha / logUpdateInterval + case HillClimbingOptionExtend: + limitation += beta / logUpdateInterval + case HillClimbingOptionShrinkPlus: + limitation = bestLimitation - alpha/logUpdateInterval + case HillClimbingOptionShrink: + limitation = bestLimitation - beta/logUpdateInterval + } + + limitation = math.Max(1.0, math.Min(limitation, float64(maxLimitation))) + u.limiter.limitation.Store(uint64(limitation)) + VerboseDebugf("[HillClimbingUpdater] The limitation is update from %d to %d.", uint64(oldLimitation), uint64(limitation)) + return nil +} + +func (u *HillClimbingUpdater) shouldDrop(lastUpdatedTime time.Time) (isDropped bool) { + if !u.limiter.lastUpdatedTime.Load().Equal(lastUpdatedTime) { + VerboseDebugf("[HillClimbingUpdater] The limitation is updated by others, drop this update, seq: %d.", u.seq) + isDropped = true + return + } + return +} diff --git a/filter/adaptivesvc/limiter/limiter.go b/filter/adaptivesvc/limiter/limiter.go new file mode 100644 index 0000000000..cdc730ed12 --- /dev/null +++ b/filter/adaptivesvc/limiter/limiter.go @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package limiter + +import ( + "fmt" +) + +var ErrReachLimitation = fmt.Errorf("reach limitation") + +var ( + Verbose = false +) + +const ( + HillClimbingLimiter = iota +) + +type Limiter interface { + Inflight() uint64 + Remaining() uint64 + Acquire() (Updater, error) +} + +type Updater interface { + DoUpdate() error +} diff --git a/filter/adaptivesvc/limiter/utils.go b/filter/adaptivesvc/limiter/utils.go new file mode 100644 index 0000000000..fb446ae26f --- /dev/null +++ b/filter/adaptivesvc/limiter/utils.go @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package limiter + +import ( + "dubbo.apache.org/dubbo-go/v3/common/logger" +) + +//var ( +// decay = 0.95 +// // cpu statistics interval (ms) +// statInterval time.Duration = 500 +// cpu *atomic.Uint64 +//) +// +//func init() { +// cpu = new(atomic.Uint64) +// // get cpu usage statistics regularly +// go cpuStat() +//} +// +//func cpuStat() { +// t := time.NewTicker(time.Microsecond * statInterval) +// +// // prevent cpuStat method from crashing unexpectedly +// defer func() { +// t.Stop() +// if err := recover(); err != nil { +// logger.Warnf("[HillClimbing] cpuStat went down, err: %v, attempting to restart...", err) +// go cpuStat() +// } +// }() +// +// for range t.C { +// stat := +// } +//} +func VerboseDebugf(msg string, args ...interface{}) { + if !Verbose { + return + } + logger.Debugf(msg, args...) +} + +func minUint64(lhs, rhs uint64) uint64 { + if lhs < rhs { + return lhs + } + return rhs +} diff --git a/filter/adaptivesvc/limiter_mapper.go b/filter/adaptivesvc/limiter_mapper.go new file mode 100644 index 0000000000..f4c2aac6aa --- /dev/null +++ b/filter/adaptivesvc/limiter_mapper.go @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package adaptivesvc + +import ( + "fmt" + "sync" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common" + "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc/limiter" +) + +var ( + limiterMapperSingleton *limiterMapper + + ErrLimiterNotFoundOnMapper = fmt.Errorf("limiter not found on mapper") + ErrLimiterTypeNotFound = fmt.Errorf("limiter type not found") +) + +func init() { + limiterMapperSingleton = newLimiterMapper() +} + +type limiterMapper struct { + rwMutex *sync.RWMutex + mapper map[string]limiter.Limiter +} + +func newLimiterMapper() *limiterMapper { + return &limiterMapper{ + rwMutex: new(sync.RWMutex), + mapper: make(map[string]limiter.Limiter), + } +} + +func (m *limiterMapper) newAndSetMethodLimiter(url *common.URL, methodName string, limiterType int) (limiter.Limiter, error) { + key := fmt.Sprintf("%s%s", url.Path, methodName) + + var ( + l limiter.Limiter + ok bool + ) + m.rwMutex.Lock() + if l, ok = limiterMapperSingleton.mapper[key]; ok { + return l, nil + } + switch limiterType { + case limiter.HillClimbingLimiter: + l = limiter.NewHillClimbing() + default: + return nil, ErrLimiterTypeNotFound + } + limiterMapperSingleton.mapper[key] = l + m.rwMutex.Unlock() + return l, nil +} + +func (m *limiterMapper) getMethodLimiter(url *common.URL, methodName string) ( + limiter.Limiter, error) { + key := fmt.Sprintf("%s%s", url.Path, methodName) + m.rwMutex.RLock() + l, ok := limiterMapperSingleton.mapper[key] + m.rwMutex.RUnlock() + if !ok { + return nil, ErrLimiterNotFoundOnMapper + } + return l, nil +} diff --git a/filter/filter_impl/import.go b/filter/filter_impl/import.go index f54bf8250f..015a5da987 100644 --- a/filter/filter_impl/import.go +++ b/filter/filter_impl/import.go @@ -23,6 +23,7 @@ package filter_impl import ( _ "dubbo.apache.org/dubbo-go/v3/filter/accesslog" _ "dubbo.apache.org/dubbo-go/v3/filter/active" + _ "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc" _ "dubbo.apache.org/dubbo-go/v3/filter/auth" _ "dubbo.apache.org/dubbo-go/v3/filter/echo" _ "dubbo.apache.org/dubbo-go/v3/filter/exec_limit" diff --git a/filter/seata/filter_test.go b/filter/seata/filter_test.go index cdbf0a06ba..4db50b0fb3 100644 --- a/filter/seata/filter_test.go +++ b/filter/seata/filter_test.go @@ -43,7 +43,7 @@ func (iv *testMockSeataInvoker) Invoke(ctx context.Context, _ protocol.Invocatio return &protocol.RPCResult{Rest: xid} } } - return &protocol.RPCResult{} + return protocol.NewRPCResult(nil, nil) } func TestSeataFilter_Invoke(t *testing.T) { diff --git a/filter/sentinel/filter.go b/filter/sentinel/filter.go index 2fe2341bcd..4b08cfa49a 100644 --- a/filter/sentinel/filter.go +++ b/filter/sentinel/filter.go @@ -212,10 +212,7 @@ func SetDubboProviderFallback(f DubboFallback) { func getDefaultDubboFallback() DubboFallback { return func(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, blockError *base.BlockError) protocol.Result { - result := &protocol.RPCResult{} - result.SetResult(nil) - result.SetError(blockError) - return result + return protocol.NewRPCResult(nil, blockError) } } diff --git a/imports/imports.go b/imports/imports.go index fb66f0df48..6e0cc1e6f4 100644 --- a/imports/imports.go +++ b/imports/imports.go @@ -18,6 +18,7 @@ package imports import ( + _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/adaptivesvc" _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/available" _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/broadcast" _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/failback" @@ -28,6 +29,7 @@ import ( _ "dubbo.apache.org/dubbo-go/v3/cluster/cluster/zoneaware" _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/consistenthashing" _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/leastactive" + _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/p2c" _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/random" _ "dubbo.apache.org/dubbo-go/v3/cluster/loadbalance/roundrobin" _ "dubbo.apache.org/dubbo-go/v3/cluster/router/v3router" @@ -37,6 +39,7 @@ import ( _ "dubbo.apache.org/dubbo-go/v3/config_center/zookeeper" _ "dubbo.apache.org/dubbo-go/v3/filter/accesslog" _ "dubbo.apache.org/dubbo-go/v3/filter/active" + _ "dubbo.apache.org/dubbo-go/v3/filter/adaptivesvc" _ "dubbo.apache.org/dubbo-go/v3/filter/auth" _ "dubbo.apache.org/dubbo-go/v3/filter/echo" _ "dubbo.apache.org/dubbo-go/v3/filter/exec_limit" diff --git a/metadata/service/local/metadata_service_proxy_factory_test.go b/metadata/service/local/metadata_service_proxy_factory_test.go index 95665baf24..f38d63c3f3 100644 --- a/metadata/service/local/metadata_service_proxy_factory_test.go +++ b/metadata/service/local/metadata_service_proxy_factory_test.go @@ -91,7 +91,5 @@ func (m *mockInvoker) Destroy() { } func (m *mockInvoker) Invoke(context.Context, protocol.Invocation) protocol.Result { - return &protocol.RPCResult{ - Rest: []string{"dubbo://localhost"}, - } + return protocol.NewRPCResult([]string{"dubbo://localhost"}, nil) } diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 0f23070f36..904bfb036c 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -158,7 +158,6 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati result.Rest = inv.Reply() result.Attrs = rest.Attrs } - logger.Debugf("result.Err: %v, result.Rest: %v", result.Err, result.Rest) return &result } diff --git a/protocol/invoker.go b/protocol/invoker.go index af63dd6702..cb6e8d1287 100644 --- a/protocol/invoker.go +++ b/protocol/invoker.go @@ -19,6 +19,7 @@ package protocol import ( "context" + "fmt" ) import ( @@ -98,3 +99,11 @@ func (bi *BaseInvoker) Destroy() { bi.destroyed.Store(true) bi.available.Store(false) } + +func (bi *BaseInvoker) String() string { + if bi.url != nil { + return fmt.Sprintf("invoker{protocol: %s, host: %s:%s, path: %s}", + bi.url.Protocol, bi.url.Ip, bi.url.Port, bi.url.Path) + } + return fmt.Sprintf("%#v", bi) +} diff --git a/protocol/protocolwrapper/protocol_filter_wrapper.go b/protocol/protocolwrapper/protocol_filter_wrapper.go index 2c24178582..2ad6cb0b85 100644 --- a/protocol/protocolwrapper/protocol_filter_wrapper.go +++ b/protocol/protocolwrapper/protocol_filter_wrapper.go @@ -26,6 +26,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/common/extension" + "dubbo.apache.org/dubbo-go/v3/common/logger" "dubbo.apache.org/dubbo-go/v3/filter" "dubbo.apache.org/dubbo-go/v3/protocol" ) @@ -85,6 +86,14 @@ func BuildInvokerChain(invoker protocol.Invoker, key string) protocol.Invoker { fi := &FilterInvoker{next: next, invoker: invoker, filter: flt} next = fi } + + if key == constant.ServiceFilterKey { + logger.Debugf("[BuildInvokerChain] The provider invocation link is %s, invoker: %s", + strings.Join(append(filterNames, "proxyInvoker"), " -> "), invoker) + } else if key == constant.ReferenceFilterKey { + logger.Debugf("[BuildInvokerChain] The consumer filters are %s, invoker: %s", + strings.Join(append(filterNames, "proxyInvoker"), " -> "), invoker) + } return next } diff --git a/protocol/result.go b/protocol/result.go index a36b16d1cc..34de34ceb6 100644 --- a/protocol/result.go +++ b/protocol/result.go @@ -17,6 +17,10 @@ package protocol +import ( + "fmt" +) + // Result is a RPC result type Result interface { // SetError sets error. @@ -49,6 +53,14 @@ type RPCResult struct { Rest interface{} } +func NewRPCResult(result interface{}, err error) *RPCResult { + return &RPCResult{ + Rest: result, + Err: err, + Attrs: make(map[string]interface{}), + } +} + // SetError sets error. func (r *RPCResult) SetError(err error) { r.Err = err @@ -76,19 +88,33 @@ func (r *RPCResult) SetAttachments(attr map[string]interface{}) { // Attachments gets all attachments func (r *RPCResult) Attachments() map[string]interface{} { + if r.Attrs == nil { + r.Attrs = make(map[string]interface{}) + } return r.Attrs } // AddAttachment adds the specified map to existing attachments in this instance. func (r *RPCResult) AddAttachment(key string, value interface{}) { + if r.Attrs == nil { + r.Attrs = make(map[string]interface{}) + } r.Attrs[key] = value } // Attachment gets attachment by key with default value. func (r *RPCResult) Attachment(key string, defaultValue interface{}) interface{} { + if r.Attrs == nil { + r.Attrs = make(map[string]interface{}) + return nil + } v, ok := r.Attrs[key] if !ok { v = defaultValue } return v } + +func (r *RPCResult) String() string { + return fmt.Sprintf("&RPCResult{Rest: %v, Attrs: %v, Err: %v}", r.Rest, r.Attrs, r.Err) +} diff --git a/remoting/codec.go b/remoting/codec.go index e1d1c8b603..8e09490d4b 100644 --- a/remoting/codec.go +++ b/remoting/codec.go @@ -29,6 +29,7 @@ type Codec interface { } type DecodeResult struct { + // IsRequest indicates whether the current request is a heartbeat request IsRequest bool Result interface{} } diff --git a/remoting/exchange.go b/remoting/exchange.go index 1fda038d1c..aa81689b84 100644 --- a/remoting/exchange.go +++ b/remoting/exchange.go @@ -18,6 +18,7 @@ package remoting import ( + "fmt" "sync" "time" ) @@ -115,6 +116,11 @@ func (response *Response) Handle() { } } +func (response *Response) String() string { + return fmt.Sprintf("&remoting.Response{ID: %d, Version: %s, SerialID: %d, Status: %d, Event: %v, Error: %v, Result: %v}", + response.ID, response.Version, response.SerialID, response.Status, response.Event, response.Error, response.Result) +} + type Options struct { // connect timeout ConnectTimeout time.Duration diff --git a/remoting/exchange_client.go b/remoting/exchange_client.go index 08098f7eb1..f75c6d0640 100644 --- a/remoting/exchange_client.go +++ b/remoting/exchange_client.go @@ -134,6 +134,9 @@ func (client *ExchangeClient) Request(invocation *protocol.Invocation, url *comm result.Rest = resultTmp.Rest result.Attrs = resultTmp.Attrs result.Err = resultTmp.Err + } else { + logger.Warnf("[ExchangeClient.Request] The type of result is unexpected, we want *protocol.RPCResult, "+ + "but we got %T", rsp.response.Result) } return nil } diff --git a/remoting/getty/listener.go b/remoting/getty/listener.go index 78b0f61e0f..337d634f4c 100644 --- a/remoting/getty/listener.go +++ b/remoting/getty/listener.go @@ -102,14 +102,14 @@ func (h *RpcClientHandler) OnClose(session getty.Session) { func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { result, ok := pkg.(remoting.DecodeResult) if !ok { - logger.Errorf("illegal package") + logger.Errorf("[RpcClientHandler.OnMessage] getty client gets an unexpected rpc result: %#v", result) return } // get heartbeat request from server if result.IsRequest { req := result.Result.(*remoting.Request) if req.Event { - logger.Debugf("get rpc heartbeat request{%#v}", req) + logger.Debugf("[RpcClientHandler.OnMessage] getty client gets a heartbeat request: %#v", req) resp := remoting.NewResponse(req.ID, req.Version) resp.Status = hessian.Response_OK resp.Event = req.Event @@ -118,22 +118,23 @@ func (h *RpcClientHandler) OnMessage(session getty.Session, pkg interface{}) { reply(session, resp) return } - logger.Errorf("illegal request but not heartbeat. {%#v}", req) + logger.Errorf("[RpcClientHandler.OnMessage] unexpected heartbeat request: %#v", req) return } h.timeoutTimes = 0 p := result.Result.(*remoting.Response) // get heartbeat if p.Event { - logger.Debugf("get rpc heartbeat response{%#v}", p) + logger.Debugf("[RpcClientHandler.OnMessage] getty client received a heartbeat response: %s", p) if p.Error != nil { - logger.Errorf("rpc heartbeat response{error: %#v}", p.Error) + logger.Errorf("[RpcClientHandler.OnMessage] a heartbeat response received by the getty client "+ + "encounters an error: %v", p.Error) } p.Handle() return } - logger.Debugf("get rpc response{%#v}", p) + logger.Debugf("[RpcClientHandler.OnMessage] getty client received a response: %s", p) h.conn.updateSession(session)