Skip to content

Commit

Permalink
adaptor opensergo logic to sentinel parser and updater, and polish so…
Browse files Browse the repository at this point in the history
…me name-format
  • Loading branch information
jnan806 committed Oct 18, 2022
1 parent 9a38e9e commit 7bcc31e
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 39 deletions.
15 changes: 6 additions & 9 deletions pkg/datasource/opensergo/demo/datasource_opensergo_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,21 @@ type Counter struct {
}

func main() {

openSergoDataSource, _ := opensergo.NewOpenSergoDataSource(host, port, namespace, app)
openSergoDataSource.Initialize()
openSergoDataSource.Start()
openSergoDataSource.RegisterSubscribeInfoOfFaulttoleranceRule()
openSergoDataSource.RegisterSubscribeInfoOfFlowRuleStrategy()

// for test High-Traffic-Scenario
test()
// simulate concurrency request
simulateConcurrency()

select {}
}

func test() {
func simulateConcurrency() {
counter := Counter{pass: new(int64), block: new(int64), total: new(int64)}
startFlowModule(&counter)

//Starting counter
// simulate request
go startFlowModule(&counter)
// print counter
go timerTask(&counter)
}

Expand Down
31 changes: 22 additions & 9 deletions pkg/datasource/opensergo/opensergo.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ func NewOpenSergoDataSource(host string, port int, namespace string, app string)
return ds, nil
}

func (ds *OpenSergoDataSource) Start() {
ds.client.Start()
}

func (ds *OpenSergoDataSource) Close() {
subscribersAll := ds.client.GetSubscriberRegistry().GetSubscribersAll()
subscribersAll.Range(func(key, value interface{}) bool {
Expand All @@ -66,9 +62,19 @@ func (ds *OpenSergoDataSource) Close() {

// Initialize
//
// set the handler for sentinel, to update sentinel local cache when the data from opensego was changed.
// 1. Set the handler for sentinel, to update sentinel local cache when the data from opensego was changed.
//
// 2. Start the NewOpenSergoClient.
//
// 3. Resister OpenSergo Subscribers by params
func (ds *OpenSergoDataSource) Initialize() error {
ds.opensergoRuleAggregator.setSentinelUpdateHandler(ds.doUpdate)
ds.client.Start()

// TODO to add datasource-params in NewOpenSergoDataSource to decide register which subscribers for datasource
// TODO add the deciding logic in follow
ds.RegisterSubscribeInfoOfFaulttoleranceRule()
ds.RegisterSubscribeInfoOfFlowRuleStrategy()
return nil
}

Expand All @@ -77,13 +83,20 @@ func (ds OpenSergoDataSource) doUpdate() {
if err != nil {
logging.Warn("[OpenSergo] Succeed to read source in Initialize()", "namespace", ds.namespace, "app", ds.app, "content", fmt.Sprintf(string(bytes)))
}

ds.Handle(bytes)
}

func (ds OpenSergoDataSource) ReadSource() ([]byte, error) {
dataMap := ds.opensergoRuleAggregator.dataMap
logging.Info("[OpenSergo] Succeed to read source", "namespace", ds.namespace, "app", ds.app, "content", dataMap)
bytes, err := json.Marshal(dataMap)
func (ds *OpenSergoDataSource) ReadSource() ([]byte, error) {
// assemble updated MixedRule
mixedRule := new(MixedRule)
if ds.opensergoRuleAggregator.mixedRuleCache.updateFlagMap[RuleType_FlowRule] {
mixedRule.FlowRule = ds.opensergoRuleAggregator.mixedRuleCache.FlowRule
}
// TODO assembler other rule-type

logging.Info("[OpenSergo] Succeed to read source", "namespace", ds.namespace, "app", ds.app, "content", mixedRule)
bytes, err := json.Marshal(mixedRule)
if err != nil {
return nil, err
}
Expand Down
72 changes: 51 additions & 21 deletions pkg/datasource/opensergo/opensergo_rule_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,52 @@ import (
"sync"
)

type MixedRuleCache struct {
MixedRule
// map[ruleType] bool, update change status of ruleType
updateFlagMap map[string]bool
// map[resourceName] index, flow.Rule index of mixedRuleCache by resourceName, used to update flow.Rule in mixedRuleCache
flowRuleNameMap map[string]int
}

func newMixedRuleCache() *MixedRuleCache {
return &MixedRuleCache{
updateFlagMap: make(map[string]bool),
}
}

type OpensergoRuleAggregator struct {
// map[kindName] []flow.Rule
dataMap map[string][]flow.Rule
ruleAssembler RuleAssemblerAggregator
sentinelUpdateHandler func()

mixedRuleCache *MixedRuleCache

// map[kindName] []v1.FaultToleranceRule
pbTtRuleMapByStrategyKind map[string][]faulttolerancePb.FaultToleranceRule
// map[kindName] []v1.RateLimitStrategy
pbRlStrategyMap map[string]faulttolerancePb.RateLimitStrategy

ruleAssembler RuleAssemblerAggregator
sentinelUpdateHandler func()
}

func NewOpensergoRuleAggregator() *OpensergoRuleAggregator {
return &OpensergoRuleAggregator{
dataMap: make(map[string][]flow.Rule),
ruleAssembler: RuleAssemblerAggregator{},
mixedRuleCache: newMixedRuleCache(),

pbTtRuleMapByStrategyKind: make(map[string][]faulttolerancePb.FaultToleranceRule),

pbRlStrategyMap: make(map[string]faulttolerancePb.RateLimitStrategy),
ruleAssembler: RuleAssemblerAggregator{},
}
}

func (aggregator *OpensergoRuleAggregator) setSentinelUpdateHandler(sentinelUpdateHandler func()) {
aggregator.sentinelUpdateHandler = sentinelUpdateHandler
}

var updateFaultToleranceRules_Mutex sync.Mutex
var updateFaultToleranceRulesMutex sync.Mutex

func (aggregator *OpensergoRuleAggregator) updateFaultToleranceRules(dataSlice []protoreflect.ProtoMessage) (bool, error) {
updateFaultToleranceRules_Mutex.Lock()
updateFaultToleranceRulesMutex.Lock()
defer updateFaultToleranceRulesMutex.Unlock()
for _, pbData := range dataSlice {
pbFaultToleranceRule := pbData.(*faulttolerancePb.FaultToleranceRule)
for _, strategyRef := range pbFaultToleranceRule.GetStrategies() {
Expand All @@ -68,14 +83,14 @@ func (aggregator *OpensergoRuleAggregator) updateFaultToleranceRules(dataSlice [

aggregator.updateFlowRule()
//aggregator.updateCircuitBreakerRule()
updateFaultToleranceRules_Mutex.Unlock()
return true, nil
}

var updateRateLimitStrategy_mutex sync.Mutex
var updateRateLimitStrategyMutex sync.Mutex

func (aggregator *OpensergoRuleAggregator) updateRateLimitStrategy(dataSlice []protoreflect.ProtoMessage) (bool, error) {
updateRateLimitStrategy_mutex.Lock()
updateRateLimitStrategyMutex.Lock()
defer updateRateLimitStrategyMutex.Unlock()
if len(dataSlice) > 0 {
for _, pbData := range dataSlice {
rateLimitStrategy := pbData.(*faulttolerancePb.RateLimitStrategy)
Expand All @@ -84,27 +99,42 @@ func (aggregator *OpensergoRuleAggregator) updateRateLimitStrategy(dataSlice []p
}

aggregator.updateFlowRule()
updateRateLimitStrategy_mutex.Unlock()
return true, nil
}

var updateFlowRule_mutex sync.Mutex

// TODO update all flow.Rule now, but in this mode, the performance would be affected when the data becoming large.
func (aggregator *OpensergoRuleAggregator) updateFlowRule() {
updateFlowRule_mutex.Lock()
flowRules := make([]flow.Rule, 0)
// assembler RateLimitStrategies for FlowRule
pbRuleOfRateLimitStrategies := aggregator.pbTtRuleMapByStrategyKind[configkind.ConfigKindRefRateLimitStrategy{}.GetSimpleName()]
flowRulesByRlStrategy := aggregator.ruleAssembler.assembleFlowRulesFromRateLimitStrategies(pbRuleOfRateLimitStrategies, aggregator.pbRlStrategyMap)
if flowRulesByRlStrategy != nil && len(flowRulesByRlStrategy) > 0 {
flowRules = append(flowRules, flowRulesByRlStrategy...)
}
// TODO update
aggregator.dataMap[RuleType_FlowRule] = flowRules
// TODO assembler other flowRule strategies

// merge flowRule between cache-data and new-data
for _, rule := range flowRules {
flowRuleIndex := aggregator.mixedRuleCache.flowRuleNameMap[rule.ResourceName()]
// if existed then update
// else append
if flowRuleIndex > 0 || (flowRuleIndex == 0 && len(aggregator.mixedRuleCache.FlowRule) == 1) {
aggregator.mixedRuleCache.FlowRule[flowRuleIndex] = rule
} else {
if aggregator.mixedRuleCache.FlowRule == nil {
aggregator.mixedRuleCache.FlowRule = make([]flow.Rule, 0)
aggregator.mixedRuleCache.flowRuleNameMap = make(map[string]int)
}

aggregator.mixedRuleCache.FlowRule = append(aggregator.mixedRuleCache.FlowRule, rule)
}
aggregator.mixedRuleCache.flowRuleNameMap[rule.ResourceName()] = len(aggregator.mixedRuleCache.FlowRule) - 1
}

aggregator.mixedRuleCache.updateFlagMap[RuleType_FlowRule] = true
aggregator.sentinelUpdateHandler()
updateFlowRule_mutex.Unlock()
aggregator.mixedRuleCache.updateFlagMap[RuleType_FlowRule] = false
}

func (aggregator *OpensergoRuleAggregator) updateCircuitBreakerRule() {

// TODO add logic of updateCircuitBreakerRule
}

0 comments on commit 7bcc31e

Please sign in to comment.