Skip to content

Commit

Permalink
add logic of mixed rule_type which is used to adapt to shared mode of…
Browse files Browse the repository at this point in the history
… OpensergoClient.
  • Loading branch information
jnan806 committed Oct 17, 2022
1 parent d205c38 commit 1dd4b2d
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 26 deletions.
6 changes: 3 additions & 3 deletions pkg/datasource/opensergo/demo/datasource_opensergo_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/ext/datasource"
_ "github.com/alibaba/sentinel-golang/ext/datasource"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/pkg/datasource/opensergo"
Expand Down Expand Up @@ -47,10 +46,11 @@ type Counter struct {

func main() {

handler := datasource.NewFlowRulesHandler(datasource.FlowRuleJsonArrayParser)
openSergoDataSource, _ := opensergo.NewOpenSergoDataSource(host, port, namespace, app, handler)
openSergoDataSource, _ := opensergo.NewOpenSergoDataSource(host, port, namespace, app)
openSergoDataSource.Initialize()
openSergoDataSource.Start()
openSergoDataSource.RegisterSubscribeInfoOfFaulttoleranceRule()
openSergoDataSource.RegisterSubscribeInfoOfFlowRuleStrategy()

// for test High-Traffic-Scenario
test()
Expand Down
98 changes: 98 additions & 0 deletions pkg/datasource/opensergo/mixed_property_adaptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package opensergo

import (
"encoding/json"
"fmt"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/core/hotspot"
"github.com/alibaba/sentinel-golang/core/isolation"
"github.com/alibaba/sentinel-golang/core/system"
"github.com/alibaba/sentinel-golang/ext/datasource"
)

type MixedRule struct {
FlowRule []flow.Rule
HotSpotParamFlowRule []hotspot.Rule
CircuitBreakerRule []circuitbreaker.Rule
SystemRule []system.Rule
IsolationRule []isolation.Rule
}

// MixedPropertyJsonArrayParser provide JSON as the default serialization for MixedRule
func MixedPropertyJsonArrayParser(src []byte) (interface{}, error) {
mixedRules := new(MixedRule)
if err := json.Unmarshal(src, mixedRules); err != nil {
desc := fmt.Sprintf("Fail to convert source bytes to []*opensergo.MixedRule, err: %s", err.Error())
return nil, datasource.NewError(datasource.ConvertSourceError, desc)
}
return mixedRules, nil
}

// MixedPropertyUpdater load the newest MixedRule to downstream flow component.
func MixedPropertyUpdater(data interface{}) error {
mixedRule := data.(*MixedRule)

var errSlice []error
flowRules := mixedRule.FlowRule
if flowRules != nil {
if err := datasource.FlowRulesUpdater(flowRules); err != nil {
errSlice = append(errSlice, err)
}
}

hotSpotParamFlowRule := mixedRule.HotSpotParamFlowRule
if hotSpotParamFlowRule != nil {
if err := datasource.HotSpotParamRulesUpdater(hotSpotParamFlowRule); err != nil {
errSlice = append(errSlice, err)
}
}

circuitBreakerRule := mixedRule.CircuitBreakerRule
if circuitBreakerRule != nil {
if err := datasource.CircuitBreakerRulesUpdater(circuitBreakerRule); err != nil {
errSlice = append(errSlice, err)
}
}

systemRules := mixedRule.SystemRule
if systemRules != nil {
if err := datasource.SystemRulesUpdater(systemRules); err != nil {
errSlice = append(errSlice, err)
}
}

isolationRule := mixedRule.IsolationRule
if isolationRule != nil {
if err := datasource.IsolationRulesUpdater(isolationRule); err != nil {
errSlice = append(errSlice, err)
}
}

if errSlice == nil || len(errSlice) == 0 {
return nil
}

var errStr string
for _, err := range errSlice {
errStr = fmt.Sprintf(" | ") + fmt.Sprintf("%+v", err)
}
return datasource.NewError(
datasource.UpdatePropertyError,
fmt.Sprintf(errStr),
)
}
32 changes: 13 additions & 19 deletions pkg/datasource/opensergo/opensergo.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type OpenSergoDataSource struct {
opensergoRuleAggregator *OpensergoRuleAggregator
}

func NewOpenSergoDataSource(host string, port int, namespace string, app string, handlers ...datasource.PropertyHandler) (*OpenSergoDataSource, error) {
func NewOpenSergoDataSource(host string, port int, namespace string, app string) (*OpenSergoDataSource, error) {
if len(namespace) == 0 || len(app) == 0 {
return nil, errors.New(fmt.Sprintf("invalid parameters, namespace: %s, app: %s", namespace, app))
}
Expand All @@ -46,19 +46,16 @@ func NewOpenSergoDataSource(host string, port int, namespace string, app string,
opensergoRuleAggregator: NewOpensergoRuleAggregator(),
}

for _, h := range handlers {
ds.AddPropertyHandler(h)
}
ds.AddPropertyHandler(datasource.NewDefaultPropertyHandler(MixedPropertyJsonArrayParser, MixedPropertyUpdater))

return ds, nil
}

func (ds OpenSergoDataSource) Start() {
func (ds *OpenSergoDataSource) Start() {
ds.client.Start()
ds.registerSubscribeInfo()
}

func (ds OpenSergoDataSource) Close() {
func (ds *OpenSergoDataSource) Close() {
subscribersAll := ds.client.GetSubscriberRegistry().GetSubscribersAll()
subscribersAll.Range(func(key, value interface{}) bool {
ds.client.UnsubscribeConfig(key.(subscribe.SubscribeKey))
Expand Down Expand Up @@ -86,23 +83,14 @@ func (ds OpenSergoDataSource) doUpdate() {
func (ds OpenSergoDataSource) ReadSource() ([]byte, error) {
dataMap := ds.opensergoRuleAggregator.dataMap
logging.Info("[OpenSergo] Succeed to read source", "namespace", ds.namespace, "app", ds.app, "content", dataMap)
bytes, err := json.Marshal(dataMap[RuleType_FlowRule])
bytes, err := json.Marshal(dataMap)
if err != nil {
return nil, err
}
return bytes, nil
}

// registerSubscribeInfo
//
// registry the subscribeInfo which would subscribe the data that sentinel focus on.
func (ds OpenSergoDataSource) registerSubscribeInfo() {
ds.registerSubscribeInfoOfFaulttoleranceRule()
ds.registerSubscribeInfoOfFlowRuleStrategy()
}

func (ds OpenSergoDataSource) registerSubscribeInfoOfFaulttoleranceRule() {

func (ds *OpenSergoDataSource) RegisterSubscribeInfoOfFaulttoleranceRule() {
// registry SubscribeInfo of FaultToleranceRule
faultToleranceRuleSubscribeKey := subscribe.NewSubscribeKey(ds.namespace, ds.app, configkind.ConfigKindRefFaultToleranceRule{})
faultToleranceRuleSubscribeInfo := client.NewSubscribeInfo(*faultToleranceRuleSubscribeKey)
Expand All @@ -114,7 +102,7 @@ func (ds OpenSergoDataSource) registerSubscribeInfoOfFaulttoleranceRule() {
logging.Info(fmt.Sprintf("Subscribing OpenSergo base fault-tolerance rules for target <%v, %v>", ds.namespace, ds.app))
}

func (ds OpenSergoDataSource) registerSubscribeInfoOfFlowRuleStrategy() {
func (ds *OpenSergoDataSource) RegisterSubscribeInfoOfFlowRuleStrategy() {
// registry SubscribeInfo of RateLimitStrategy
rateLimitStrategySubscribeKey := subscribe.NewSubscribeKey(ds.namespace, ds.app, configkind.ConfigKindRefRateLimitStrategy{})
rateLimitStrategySubscribeInfo := client.NewSubscribeInfo(*rateLimitStrategySubscribeKey)
Expand All @@ -126,3 +114,9 @@ func (ds OpenSergoDataSource) registerSubscribeInfoOfFlowRuleStrategy() {
logging.Info(fmt.Sprintf("Subscribing OpenSergo base rate-limit strategies for target <%v, %v>", ds.namespace, ds.app))
// TODO register other FlowRule Strategy
}

// NOTE: unsubscribe operation does not affect existing rules in SentinelProperty.
func (ds *OpenSergoDataSource) unSubscribeFlowRuleStrategy() {
rateLimitStrategySubscribeKey := subscribe.NewSubscribeKey(ds.namespace, ds.app, configkind.ConfigKindRefRateLimitStrategy{})
ds.client.UnsubscribeConfig(*rateLimitStrategySubscribeKey)
}
9 changes: 5 additions & 4 deletions pkg/datasource/opensergo/opensergo_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
package opensergo

const (
RuleType_FlowRule string = "FlowRule"
RuleType_CircuitBreakerRule string = "DegradeRule"
RuleType_SystemAdaptiveRule string = "SystemRule"
RuleType_ParamFlowRule string = "ParamFlowRule"
RuleType_FlowRule string = "FlowRule"
RuleType_CircuitBreakerRule string = "CircuitBreakerRule"
RuleType_SystemAdaptiveRule string = "SystemRule"
RuleType_HotSpotParamFlowRule string = "HotSpotParamFlowRule"
RuleType_IsolationRule string = "IsolationRule"
)

0 comments on commit 1dd4b2d

Please sign in to comment.