diff --git a/cluster/loadbalance/least_active.go b/cluster/loadbalance/least_active.go new file mode 100644 index 0000000000..d7d3056818 --- /dev/null +++ b/cluster/loadbalance/least_active.go @@ -0,0 +1,101 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// @author yiji@apache.org +package loadbalance + +import ( + "math/rand" +) + +import ( + "github.com/apache/dubbo-go/cluster" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol" +) + +const ( + LeastActive = "leastactive" +) + +func init() { + extension.SetLoadbalance(LeastActive, NewLeastActiveLoadBalance) +} + +type leastActiveLoadBalance struct { +} + +func NewLeastActiveLoadBalance() cluster.LoadBalance { + return &leastActiveLoadBalance{} +} + +func (lb *leastActiveLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker { + count := len(invokers) + if count == 0 { + return nil + } + if count == 1 { + return invokers[0] + } + + var ( + leastActive int32 = -1 // The least active value of all invokers + totalWeight int64 = 0 // The number of invokers having the same least active value (LEAST_ACTIVE) + firstWeight int64 = 0 // Initial value, used for comparision + leastIndexes = make([]int, count) // The index of invokers having the same least active value (LEAST_ACTIVE) + leastCount = 0 // The number of invokers having the same least active value (LEAST_ACTIVE) + sameWeight = true // Every invoker has the same weight value? + ) + + for i := 0; i < count; i++ { + invoker := invokers[i] + // Active number + active := protocol.GetStatus(invoker.GetUrl(), invocation.MethodName()).GetActive() + // current weight (maybe in warmUp) + weight := GetWeight(invoker, invocation) + // There are smaller active services + if leastActive == -1 || active < leastActive { + leastActive = active + leastIndexes[0] = i + leastCount = 1 // next available leastIndex offset + totalWeight = weight + firstWeight = weight + sameWeight = true + } else if active == leastActive { + leastIndexes[leastCount] = i + totalWeight += weight + leastCount++ + + if sameWeight && (i > 0) && weight != firstWeight { + sameWeight = false + } + } + } + + if leastCount == 1 { + return invokers[0] + } + + if !sameWeight && totalWeight > 0 { + offsetWeight := rand.Int63n(totalWeight) + 1 + for i := 0; i < leastCount; i++ { + leastIndex := leastIndexes[i] + offsetWeight -= GetWeight(invokers[i], invocation) + if offsetWeight <= 0 { + return invokers[leastIndex] + } + } + } + + index := leastIndexes[rand.Intn(leastCount)] + return invokers[index] +} diff --git a/cluster/loadbalance/least_active_test.go b/cluster/loadbalance/least_active_test.go new file mode 100644 index 0000000000..c29a2092a1 --- /dev/null +++ b/cluster/loadbalance/least_active_test.go @@ -0,0 +1,67 @@ +package loadbalance + +import ( + "context" + "fmt" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +func TestLeastActiveSelect(t *testing.T) { + loadBalance := NewLeastActiveLoadBalance() + + var invokers []protocol.Invoker + + url, _ := common.NewURL(context.TODO(), "dubbo://192.168.1.0:20000/org.apache.demo.HelloService") + invokers = append(invokers, protocol.NewBaseInvoker(url)) + i := loadBalance.Select(invokers, &invocation.RPCInvocation{}) + assert.True(t, i.GetUrl().URLEqual(url)) + + for i := 1; i < 10; i++ { + url, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService", i)) + invokers = append(invokers, protocol.NewBaseInvoker(url)) + } + loadBalance.Select(invokers, &invocation.RPCInvocation{}) +} + +func TestLeastActiveByWeight(t *testing.T) { + loadBalance := NewLeastActiveLoadBalance() + + var invokers []protocol.Invoker + loop := 3 + for i := 1; i <= loop; i++ { + url, _ := common.NewURL(context.TODO(), fmt.Sprintf("test%v://192.168.1.%v:20000/org.apache.demo.HelloService?weight=%v", i, i, i)) + invokers = append(invokers, protocol.NewBaseInvoker(url)) + } + + inv := new(invocation.RPCInvocation) + inv.SetMethod("test") + protocol.BeginCount(invokers[2].GetUrl(), inv.MethodName()) + + loop = 10000 + + var ( + firstCount int + secondCount int + ) + + for i := 1; i <= loop; i++ { + invoker := loadBalance.Select(invokers, inv) + if invoker.GetUrl().Protocol == "test1" { + firstCount++ + } else if invoker.GetUrl().Protocol == "test2" { + secondCount++ + } + } + + assert.Equal(t, firstCount+secondCount, loop) +} diff --git a/filter/impl/active_filter.go b/filter/impl/active_filter.go new file mode 100644 index 0000000000..435bfe7488 --- /dev/null +++ b/filter/impl/active_filter.go @@ -0,0 +1,47 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// @author yiji@apache.org +package impl + +import ( + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/filter" + "github.com/apache/dubbo-go/protocol" +) + +const active = "active" + +func init() { + extension.SetFilter(active, GetActiveFilter) +} + +type ActiveFilter struct { +} + +func (ef *ActiveFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + logger.Infof("invoking active filter. %v,%v", invocation.MethodName(), len(invocation.Arguments())) + + protocol.BeginCount(invoker.GetUrl(), invocation.MethodName()) + return invoker.Invoke(invocation) +} + +func (ef *ActiveFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + + protocol.EndCount(invoker.GetUrl(), invocation.MethodName()) + return result +} + +func GetActiveFilter() filter.Filter { + return &ActiveFilter{} +} diff --git a/protocol/RpcStatus.go b/protocol/RpcStatus.go new file mode 100644 index 0000000000..b9f3e6ecb1 --- /dev/null +++ b/protocol/RpcStatus.go @@ -0,0 +1,71 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// @author yiji@apache.org +package protocol + +import ( + "sync" + "sync/atomic" +) + +import ( + "github.com/apache/dubbo-go/common" +) + +var ( + methodStatistics sync.Map // url -> { methodName : RpcStatus} +) + +type RpcStatus struct { + active int32 +} + +func (rpc *RpcStatus) GetActive() int32 { + return atomic.LoadInt32(&rpc.active) +} + +func GetStatus(url common.URL, methodName string) *RpcStatus { + identifier := url.Key() + methodMap, found := methodStatistics.Load(identifier) + if !found { + methodMap = &sync.Map{} + methodStatistics.Store(identifier, methodMap) + } + + methodActive := methodMap.(*sync.Map) + rpcStatus, found := methodActive.Load(methodName) + if !found { + rpcStatus = &RpcStatus{} + methodActive.Store(methodName, rpcStatus) + } + + status := rpcStatus.(*RpcStatus) + return status +} + +func BeginCount(url common.URL, methodName string) { + beginCount0(GetStatus(url, methodName)) +} + +func EndCount(url common.URL, methodName string) { + endCount0(GetStatus(url, methodName)) +} + +// private methods +func beginCount0(rpcStatus *RpcStatus) { + atomic.AddInt32(&rpcStatus.active, 1) +} + +func endCount0(rpcStatus *RpcStatus) { + atomic.AddInt32(&rpcStatus.active, -1) +}