Skip to content

Commit

Permalink
Add refreshable file datasource.
Browse files Browse the repository at this point in the history
  • Loading branch information
louyuting committed Mar 10, 2020
1 parent 6888afc commit 6e51178
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 0 deletions.
45 changes: 45 additions & 0 deletions ext/datasource/plugin/ext_flow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package plugin

import (
"encoding/json"
"fmt"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/ext/datasource"
"github.com/pkg/errors"
)

func FlowRulesJsonConverter(src []byte) (interface{}, error) {
if len(src) == 0 {
return nil, nil
}
ret := make([]flow.FlowRule, 0)
err := json.Unmarshal(src, &ret)
if err != nil {
return nil, errors.Errorf("Fail to unmarshal source:%+v to []system.SystemRule, err:%+v", src, err)
}
return ret, nil
}

func FlowRulesUpdater(data interface{}) error {
rules := make([]*flow.FlowRule, 0)
if data == nil {
rules = nil
} else {
val, ok := data.([]flow.FlowRule)
if !ok {
return errors.New(fmt.Sprintf("Fail to type assert data to []*flow.FlowRule, in fact, data: %+v", data))
}
for _, v := range val {
rules = append(rules, &v)
}
}
succ, err := flow.LoadRules(rules)
if succ {
return nil
}
return err
}

func NewFlowRulesHandler() datasource.PropertyHandler {
return datasource.NewDefaultPropertyHandler(FlowRulesJsonConverter, FlowRulesUpdater)
}
45 changes: 45 additions & 0 deletions ext/datasource/plugin/ext_system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package plugin

import (
"encoding/json"
"fmt"
"github.com/alibaba/sentinel-golang/core/system"
"github.com/alibaba/sentinel-golang/ext/datasource"
"github.com/pkg/errors"
)

func SystemRulesJsonConvert(src []byte) (interface{}, error) {
if len(src) == 0 {
return nil, nil
}
ret := make([]system.SystemRule, 0)
err := json.Unmarshal(src, &ret)
if err != nil {
return nil, errors.Errorf("Fail to unmarshal source:%+v to []system.SystemRule, err:%+v", src, err)
}
return ret, nil
}

func SystemRulesUpdate(data interface{}) error {
rules := make([]*system.SystemRule, 0)
if data == nil {
rules = nil
} else {
val, ok := data.([]system.SystemRule)
if !ok {
return errors.New(fmt.Sprintf("Fail to type assert data to []*flow.FlowRule, in fact, data: %+v", data))
}
for _, v := range val {
rules = append(rules, &v)
}
}
succ, err := system.LoadRules(rules)
if succ {
return nil
}
return err
}

func NewSystemRulesHandler() *datasource.DefaultPropertyHandler {
return datasource.NewDefaultPropertyHandler(SystemRulesJsonConvert, SystemRulesUpdate)
}
126 changes: 126 additions & 0 deletions ext/datasource/refreshable_file/file_datasource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package refreshable_file

import (
"fmt"
"github.com/alibaba/sentinel-golang/ext/datasource"
"github.com/alibaba/sentinel-golang/logging"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"io/ioutil"
"os"
"sync"
"time"
)

var (
logger = logging.GetDefaultLogger()
)

type RefreshableFileDataSource struct {
sourceFilePath string
handlers []datasource.PropertyHandler
once sync.Once
}

func FileDataSourceStarter(sourceFilePath string, handlers ...datasource.PropertyHandler) *RefreshableFileDataSource {
ds := &RefreshableFileDataSource{
sourceFilePath: sourceFilePath,
handlers: handlers,
}
ds.Initialize()
return ds
}

// return idx if existed, else return -1
func (s *RefreshableFileDataSource) indexOfHandler(h datasource.PropertyHandler) int {
for idx, handler := range s.handlers {
if handler == h {
return idx
}
}
return -1
}

func (s *RefreshableFileDataSource) AddPropertyHandler(h datasource.PropertyHandler) {
if s.indexOfHandler(h) < 0 {
return
}
s.handlers = append(s.handlers, h)
}

func (s *RefreshableFileDataSource) RemovePropertyHandler(h datasource.PropertyHandler) {
idx := s.indexOfHandler(h)
if idx < 0 {
return
}
s.handlers = append(s.handlers[:idx], s.handlers[idx+1:]...)
}

func (s *RefreshableFileDataSource) ReadSource() ([]byte, error) {
f, err := os.Open(s.sourceFilePath)
defer f.Close()

if err != nil {
return nil, errors.Errorf("The rules file is not existed, err:%+v.", errors.WithStack(err))
}
src, err := ioutil.ReadAll(f)
if err != nil {
return nil, errors.Errorf("Fail to read file, err: %+v.", errors.WithStack(err))
}
return src, nil
}

func (s *RefreshableFileDataSource) Initialize() {
s.doUpdate()
// start watcher
s.once.Do(
func() {
go func() {
watcher, err := fsnotify.NewWatcher()
defer watcher.Close()

if err != nil {
panic(fmt.Sprintf("Fail to new a watcher of fsnotify, err:%+v", err))
}
err = watcher.Add(s.sourceFilePath)
if err != nil {
panic(fmt.Sprintf("Fail add a watcher on file(%s), err:%+v", s.sourceFilePath, err))
}

for {
select {
case ev := <-watcher.Events:
if ev.Op&fsnotify.Write == fsnotify.Write {
s.doUpdate()
}

if ev.Op&fsnotify.Remove == fsnotify.Remove || ev.Op&fsnotify.Rename == fsnotify.Rename {
logger.Errorf("The file source(%s) was removed or renamed.", s.sourceFilePath)
return
}
case err := <-watcher.Errors:
logger.Errorf("Watch err on file(%s), err:", s.sourceFilePath, err)
time.Sleep(time.Second * 3)
}
}
}()
})
}

func (s *RefreshableFileDataSource) doUpdate() {
src, err := s.ReadSource()
if err!= nil {
logger.Errorf("%+v", err)
return
}
for _, h := range s.handlers {
err := h.Handle(src)
if err != nil {
logger.Errorf("RefreshableFileDataSource fail to publish rules, handle:%+v; err=%+v.", h, err)
}
}
}

func (s *RefreshableFileDataSource) Close() error {
return nil
}
19 changes: 19 additions & 0 deletions ext/datasource/refreshable_file/file_datasource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package refreshable_file

import (
"github.com/alibaba/sentinel-golang/ext/datasource/plugin"
"testing"
"time"
)

func TestNewFileDataSource_FlowRule(t *testing.T) {
ds := FileDataSourceStarter("../../../tests/testdata/extension/refreshable_file/FlowRule.json", plugin.NewFlowRulesHandler())
time.Sleep(5 * time.Second)
ds.Close()
}

func TestNewFileDataSource_SystemRule(t *testing.T) {
ds := FileDataSourceStarter("../../../tests/testdata/extension/refreshable_file/SystemRule.json", plugin.NewSystemRulesHandler())
time.Sleep(5 * time.Second)
ds.Close()
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ require (
golang.org/x/sys v0.0.0-20200107162124-548cf772de50 // indirect
gopkg.in/yaml.v2 v2.2.2
go.uber.org/multierr v1.5.0
github.com/fsnotify/fsnotify v1.4.7
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ github.com/envoyproxy/protoc-gen-validate v0.0.14/go.mod h1:iSmxcyjqTsJpI2R4NaDN
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0yfBxPvZrHkprdPPTTS3N5rwmLE8T22KBXlw=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/structs v0.0.0-20180123065059-ebf56d35bba7/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
Expand Down Expand Up @@ -337,6 +338,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20170807180024-9a379c6b3e95/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down
47 changes: 47 additions & 0 deletions tests/testdata/extension/refreshable_file/FlowRule.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
[
{
"id": 0,
"resource": "abc0",
"limitApp": "default",
"grade": 1,
"strategy": 0,
"controlBehavior": 0,
"refResource": "refDefault",
"warmUpPeriodSec": 10,
"maxQueueingTimeMs":1000,
"clusterMode": false,
"clusterConfig": {
"thresholdType": 0
}
},
{
"id": 1,
"resource": "abc1",
"limitApp": "default",
"grade": 1,
"strategy": 0,
"controlBehavior": 0,
"refResource": "refDefault",
"warmUpPeriodSec": 10,
"maxQueueingTimeMs":1000,
"clusterMode": false,
"clusterConfig": {
"thresholdType": 0
}
},
{
"id": 2,
"resource": "abc2",
"limitApp": "default",
"grade": 1,
"strategy": 0,
"controlBehavior": 0,
"refResource": "refDefault",
"warmUpPeriodSec": 10,
"maxQueueingTimeMs":1000,
"clusterMode": false,
"clusterConfig": {
"thresholdType": 0
}
}
]
17 changes: 17 additions & 0 deletions tests/testdata/extension/refreshable_file/SystemRule.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[
{
"id": 0,
"metricType": 0,
"adaptiveStrategy": 0
},
{
"id": 1,
"metricType": 0,
"adaptiveStrategy": 0
},
{
"id": 2,
"metricType": 0,
"adaptiveStrategy": 0
}
]

0 comments on commit 6e51178

Please sign in to comment.