diff --git a/pkg/datasource/opensergo/demo/datasource_opensergo_example.go b/pkg/datasource/opensergo/demo/datasource_opensergo_example.go index d54ed519..d6960558 100644 --- a/pkg/datasource/opensergo/demo/datasource_opensergo_example.go +++ b/pkg/datasource/opensergo/demo/datasource_opensergo_example.go @@ -31,7 +31,7 @@ import ( ) const ( - host string = "33.1.33.1" + host string = "127.0.0.1" port int = 10246 namespace string = "default" @@ -75,7 +75,7 @@ func startFlowModule(counter *Counter) { for i := 0; i < 10; i++ { go func() { for { - e, b := sentinel.Entry("GET:/foo/1/2", sentinel.WithTrafficType(base.Inbound)) + e, b := sentinel.Entry("GET:/foo/1", sentinel.WithTrafficType(base.Inbound)) if b != nil { // Blocked. We could get the block reason from the BlockError. atomic.AddInt64(counter.block, 1) diff --git a/pkg/datasource/opensergo/go.mod b/pkg/datasource/opensergo/go.mod index 32d364cc..0a406812 100644 --- a/pkg/datasource/opensergo/go.mod +++ b/pkg/datasource/opensergo/go.mod @@ -4,7 +4,7 @@ go 1.13 require ( github.com/alibaba/sentinel-golang v1.0.4 - github.com/opensergo/opensergo-go v0.0.0-20220331070310-e5b01fee4d1c + github.com/opensergo/opensergo-go v0.0.0-20221129091737-554d5c0b9105 github.com/pkg/errors v0.9.1 google.golang.org/protobuf v1.27.1 ) diff --git a/pkg/datasource/opensergo/go.sum b/pkg/datasource/opensergo/go.sum index 46ab0b7b..a1ba4e91 100644 --- a/pkg/datasource/opensergo/go.sum +++ b/pkg/datasource/opensergo/go.sum @@ -89,8 +89,9 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= -github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM39fZuwSd1LwSqqSW0hOdXCYYDX0R3I= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -212,8 +213,8 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= -github.com/opensergo/opensergo-go v0.0.0-20220331070310-e5b01fee4d1c h1:Ivg+PYq7sOf2IwBGbo9Tt6jk+LHSUapWVPBEr9ygh8Q= -github.com/opensergo/opensergo-go v0.0.0-20220331070310-e5b01fee4d1c/go.mod h1:VxL391S2BWXU1m14087xt8+2YTgsnfa+xsSrbuoFKl4= +github.com/opensergo/opensergo-go v0.0.0-20221126135909-3e0f44ba9174 h1:isg3MHnUHgajeWoMOPvjBbzCt+GSgqHIhGpDpI8MB0M= +github.com/opensergo/opensergo-go v0.0.0-20221126135909-3e0f44ba9174/go.mod h1:k5tUtAyJ3mDYZwbcsz29+IiBcPzBQlCQeDxlNgJQNA8= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= diff --git a/pkg/datasource/opensergo/opensergo.go b/pkg/datasource/opensergo/opensergo.go index 15677e8a..24175856 100644 --- a/pkg/datasource/opensergo/opensergo.go +++ b/pkg/datasource/opensergo/opensergo.go @@ -52,8 +52,7 @@ func NewOpenSergoDataSource(host string, port int, namespace string, app string) } func (ds *OpenSergoDataSource) Close() { - subscribersAll := ds.client.GetSubscriberRegistry().GetSubscribersAll() - subscribersAll.Range(func(key, value interface{}) bool { + ds.client.SubscriberRegistry().RunWithRangeRegistry(func(key, value interface{}) bool { ds.client.UnsubscribeConfig(key.(subscribe.SubscribeKey)) logging.Info("[OpenSergoDatasource] Unsubscribing OpenSergo config.", "SubscribeKey", key) return true @@ -109,8 +108,6 @@ func (ds *OpenSergoDataSource) RegisterSubscribeInfoOfFaulttoleranceRule() { faultToleranceRuleSubscribeInfo := client.NewSubscribeInfo(faultToleranceRuleSubscribeKey) faulttoleranceRuleSubscriber := NewFaulttoleranceRuleSubscriber(ds.opensergoRuleAggregator) faultToleranceRuleSubscribeInfo.AppendSubscriber(faulttoleranceRuleSubscriber) - // log data for test - faultToleranceRuleSubscribeInfo.AppendSubscriber(subscribe.DefaultSubscriber{}) ds.client.RegisterSubscribeInfo(faultToleranceRuleSubscribeInfo) logging.Info("[OpenSergoDatasource] Subscribing OpenSergo base fault-tolerance strategies.", "namespace", ds.namespace, "app", ds.app) } @@ -121,8 +118,6 @@ func (ds *OpenSergoDataSource) RegisterSubscribeInfoOfFlowRuleStrategy() { rateLimitStrategySubscribeInfo := client.NewSubscribeInfo(rateLimitStrategySubscribeKey) rateLimitStrategySubscriber := NewFlowruleStrategySubscriber(ds.opensergoRuleAggregator) rateLimitStrategySubscribeInfo.AppendSubscriber(rateLimitStrategySubscriber) - // log data for test - rateLimitStrategySubscribeInfo.AppendSubscriber(subscribe.DefaultSubscriber{}) ds.client.RegisterSubscribeInfo(rateLimitStrategySubscribeInfo) logging.Info("[OpenSergoDatasource] Subscribing OpenSergo base rate-limit strategies.", "namespace", ds.namespace, "app", ds.app) // TODO register other FlowRule Strategy diff --git a/pkg/datasource/opensergo/opensergo_faulttolerance_rule_subscriber.go b/pkg/datasource/opensergo/opensergo_faulttolerance_rule_subscriber.go index 76c7b88f..cd900600 100644 --- a/pkg/datasource/opensergo/opensergo_faulttolerance_rule_subscriber.go +++ b/pkg/datasource/opensergo/opensergo_faulttolerance_rule_subscriber.go @@ -31,9 +31,10 @@ func NewFaulttoleranceRuleSubscriber(opensergoRuleAggregator *OpensergoRuleAggre } } -func (faulttoleranceRuleSubscriber FaulttoleranceRuleSubscriber) OnSubscribeDataUpdate(subscribeKey subscribe.SubscribeKey, dataSlice []protoreflect.ProtoMessage) (bool, error) { - if reflect.ValueOf(subscribeKey.GetKind()).Interface() != reflect.ValueOf(configkind.ConfigKindRefFaultToleranceRule{}).Interface() { +func (faulttoleranceRuleSubscriber FaulttoleranceRuleSubscriber) OnSubscribeDataUpdate(subscribeKey subscribe.SubscribeKey, data interface{}) (bool, error) { + messages := data.([]protoreflect.ProtoMessage) + if reflect.ValueOf(subscribeKey.Kind()).Interface() != reflect.ValueOf(configkind.ConfigKindRefFaultToleranceRule{}).Interface() { return false, nil } - return faulttoleranceRuleSubscriber.opensergoRuleAggregator.updateFaultToleranceRules(dataSlice) + return faulttoleranceRuleSubscriber.opensergoRuleAggregator.updateFaultToleranceRules(messages) } diff --git a/pkg/datasource/opensergo/opensergo_flowrule_strategy_subscriber.go b/pkg/datasource/opensergo/opensergo_flowrule_strategy_subscriber.go index 67f74c5e..abbf9111 100644 --- a/pkg/datasource/opensergo/opensergo_flowrule_strategy_subscriber.go +++ b/pkg/datasource/opensergo/opensergo_flowrule_strategy_subscriber.go @@ -31,10 +31,11 @@ func NewFlowruleStrategySubscriber(opensergoRuleAggregator *OpensergoRuleAggrega } } -func (flowruleStrategySubscriber FlowruleStrategySubscriber) OnSubscribeDataUpdate(subscribeKey subscribe.SubscribeKey, dataSlice []protoreflect.ProtoMessage) (bool, error) { - switch reflect.ValueOf(subscribeKey.GetKind()).Interface() { +func (flowruleStrategySubscriber FlowruleStrategySubscriber) OnSubscribeDataUpdate(subscribeKey subscribe.SubscribeKey, data interface{}) (bool, error) { + messages := data.([]protoreflect.ProtoMessage) + switch reflect.ValueOf(subscribeKey.Kind()).Interface() { case reflect.ValueOf(configkind.ConfigKindRefRateLimitStrategy{}).Interface(): - return flowruleStrategySubscriber.opensergoRuleAggregator.updateRateLimitStrategy(dataSlice) + return flowruleStrategySubscriber.opensergoRuleAggregator.updateRateLimitStrategy(messages) default: return false, nil }