From f25ccc1330f8b8ceb3bbbfbd66747cbeaddada41 Mon Sep 17 00:00:00 2001 From: louyuting Date: Wed, 18 Mar 2020 21:04:03 +0800 Subject: [PATCH] Add refreshable file data-source implementation (#86) --- ext/datasource/datasource.go | 36 +++ ext/datasource/datasource_test.go | 64 +++++ ext/datasource/file/refreshable_file.go | 119 +++++++++ ext/datasource/file/refreshable_file_test.go | 247 +++++++++++++++++++ ext/datasource/mock.go | 17 ++ ext/datasource/property_test.go | 2 +- go.mod | 1 + go.sum | 1 + 8 files changed, 486 insertions(+), 1 deletion(-) create mode 100644 ext/datasource/datasource_test.go create mode 100644 ext/datasource/file/refreshable_file.go create mode 100644 ext/datasource/file/refreshable_file_test.go create mode 100644 ext/datasource/mock.go diff --git a/ext/datasource/datasource.go b/ext/datasource/datasource.go index 79c3bcb44..cf32d52c3 100644 --- a/ext/datasource/datasource.go +++ b/ext/datasource/datasource.go @@ -22,3 +22,39 @@ type DataSource interface { // Close the data source. io.Closer } + +type Base struct { + handlers []PropertyHandler +} + +func (b *Base) Handlers() []PropertyHandler { + return b.handlers +} + +// return idx if existed, else return -1 +func (b *Base) indexOfHandler(h PropertyHandler) int { + for idx, handler := range b.handlers { + if handler == h { + return idx + } + } + return -1 +} + +func (b *Base) AddPropertyHandler(h PropertyHandler) { + if h == nil || b.indexOfHandler(h) >= 0 { + return + } + b.handlers = append(b.handlers, h) +} + +func (b *Base) RemovePropertyHandler(h PropertyHandler) { + if h == nil { + return + } + idx := b.indexOfHandler(h) + if idx < 0 { + return + } + b.handlers = append(b.handlers[:idx], b.handlers[idx+1:]...) +} diff --git a/ext/datasource/datasource_test.go b/ext/datasource/datasource_test.go new file mode 100644 index 000000000..2d80c0942 --- /dev/null +++ b/ext/datasource/datasource_test.go @@ -0,0 +1,64 @@ +package datasource + +import ( + "github.com/stretchr/testify/assert" + "reflect" + "testing" +) + +func TestBase_AddPropertyHandler(t *testing.T) { + t.Run("AddPropertyHandler_nil", func(t *testing.T) { + b := &Base{ + handlers: make([]PropertyHandler, 0), + } + b.AddPropertyHandler(nil) + assert.True(t, len(b.handlers) == 0, "Fail to execute the case TestBase_AddPropertyHandler.") + }) + + t.Run("AddPropertyHandler_Normal", func(t *testing.T) { + b := &Base{ + handlers: make([]PropertyHandler, 0), + } + h := &DefaultPropertyHandler{} + b.AddPropertyHandler(h) + assert.True(t, len(b.handlers) == 1 && reflect.DeepEqual(b.handlers[0], h), "Fail to execute the case TestBase_AddPropertyHandler.") + }) +} + +func TestBase_RemovePropertyHandler(t *testing.T) { + t.Run("TestBase_RemovePropertyHandler_nil", func(t *testing.T) { + b := &Base{ + handlers: make([]PropertyHandler, 0), + } + h1 := &DefaultPropertyHandler{} + b.handlers = append(b.handlers, h1) + b.RemovePropertyHandler(nil) + assert.True(t, len(b.handlers) == 1, "The case TestBase_RemovePropertyHandler execute failed.") + }) + + t.Run("TestBase_RemovePropertyHandler", func(t *testing.T) { + b := &Base{ + handlers: make([]PropertyHandler, 0), + } + h1 := &DefaultPropertyHandler{} + b.handlers = append(b.handlers, h1) + b.RemovePropertyHandler(h1) + assert.True(t, len(b.handlers) == 0, "The case TestBase_RemovePropertyHandler execute failed.") + }) +} + +func TestBase_indexOfHandler(t *testing.T) { + t.Run("TestBase_indexOfHandler", func(t *testing.T) { + b := &Base{ + handlers: make([]PropertyHandler, 0), + } + h1 := &DefaultPropertyHandler{} + b.handlers = append(b.handlers, h1) + h2 := &DefaultPropertyHandler{} + b.handlers = append(b.handlers, h2) + h3 := &DefaultPropertyHandler{} + b.handlers = append(b.handlers, h3) + + assert.True(t, b.indexOfHandler(h2) == 1, "Fail to execute the case TestBase_indexOfHandler.") + }) +} diff --git a/ext/datasource/file/refreshable_file.go b/ext/datasource/file/refreshable_file.go new file mode 100644 index 000000000..64a0194c4 --- /dev/null +++ b/ext/datasource/file/refreshable_file.go @@ -0,0 +1,119 @@ +package file + +import ( + "github.com/alibaba/sentinel-golang/ext/datasource" + "github.com/alibaba/sentinel-golang/logging" + "github.com/alibaba/sentinel-golang/util" + "github.com/fsnotify/fsnotify" + "github.com/pkg/errors" + "io/ioutil" + "os" +) + +var ( + logger = logging.GetDefaultLogger() +) + +type RefreshableFileDataSource struct { + datasource.Base + sourceFilePath string + isInitialized util.AtomicBool + closeChan chan struct{} + watcher *fsnotify.Watcher +} + +func NewFileDataSource(sourceFilePath string, handlers ...datasource.PropertyHandler) *RefreshableFileDataSource { + var ds = &RefreshableFileDataSource{ + sourceFilePath: sourceFilePath, + closeChan: make(chan struct{}), + } + for _, h := range handlers { + ds.AddPropertyHandler(h) + } + return ds +} + +func (s *RefreshableFileDataSource) ReadSource() ([]byte, error) { + f, err := os.Open(s.sourceFilePath) + if err != nil { + return nil, errors.Errorf("RefreshableFileDataSource fail to open the property file, err: %+v.", err) + } + defer f.Close() + + src, err := ioutil.ReadAll(f) + if err != nil { + return nil, errors.Errorf("RefreshableFileDataSource fail to read file, err: %+v.", err) + } + return src, nil +} + +func (s *RefreshableFileDataSource) Initialize() error { + if !s.isInitialized.CompareAndSet(false, true) { + return nil + } + + err := s.doReadAndUpdate() + if err != nil { + logger.Errorf("Fail to execute doReadAndUpdate, err: %+v", err) + } + + w, err := fsnotify.NewWatcher() + if err != nil { + return errors.Errorf("Fail to new a watcher instance of fsnotify, err: %+v", err) + } + err = w.Add(s.sourceFilePath) + if err != nil { + return errors.Errorf("Fail add a watcher on file[%s], err: %+v", s.sourceFilePath, err) + } + s.watcher = w + + go util.RunWithRecover(func() { + defer s.watcher.Close() + for { + select { + case ev := <-s.watcher.Events: + if ev.Op&fsnotify.Write == fsnotify.Write { + err := s.doReadAndUpdate() + if err != nil { + logger.Errorf("Fail to execute doReadAndUpdate, err: %+v", err) + } + } + + if ev.Op&fsnotify.Remove == fsnotify.Remove || ev.Op&fsnotify.Rename == fsnotify.Rename { + logger.Errorf("The file source[%s] was removed or renamed.", s.sourceFilePath) + for _, h := range s.Handlers() { + err := h.Handle(nil) + if err != nil { + logger.Errorf("RefreshableFileDataSource fail to publish property, err: %+v.", err) + } + } + } + case err := <-s.watcher.Errors: + logger.Errorf("Watch err on file[%s], err: %+v", s.sourceFilePath, err) + case <-s.closeChan: + return + } + } + }, logger) + return nil +} + +func (s *RefreshableFileDataSource) doReadAndUpdate() error { + src, err := s.ReadSource() + if err != nil { + return errors.Errorf("Fail to read source, err: %+v", err) + } + for _, h := range s.Handlers() { + err := h.Handle(src) + if err != nil { + return errors.Errorf("RefreshableFileDataSource fail to publish property, err: %+v.", err) + } + } + return nil +} + +func (s *RefreshableFileDataSource) Close() error { + s.closeChan <- struct{}{} + logger.Infof("The RefreshableFileDataSource for [%s] had been closed.", s.sourceFilePath) + return nil +} diff --git a/ext/datasource/file/refreshable_file_test.go b/ext/datasource/file/refreshable_file_test.go new file mode 100644 index 000000000..0b76cbe20 --- /dev/null +++ b/ext/datasource/file/refreshable_file_test.go @@ -0,0 +1,247 @@ +package file + +import ( + "github.com/alibaba/sentinel-golang/ext/datasource" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + tmock "github.com/stretchr/testify/mock" + "io/ioutil" + "os" + "reflect" + "strings" + "testing" + "time" +) + +const ( + TestSystemRules = `[ + { + "id": 0, + "metricType": 0, + "adaptiveStrategy": 0 + }, + { + "id": 1, + "metricType": 0, + "adaptiveStrategy": 0 + }, + { + "id": 2, + "metricType": 0, + "adaptiveStrategy": 0 + } +]` +) + +var ( + TestSystemRulesDir = "./" + TestSystemRulesFile = TestSystemRulesDir + "SystemRules.json" +) + +func prepareSystemRulesTestFile() error { + content := []byte(TestSystemRules) + return ioutil.WriteFile(TestSystemRulesFile, content, os.ModePerm) +} + +func deleteSystemRulesTestFile() error { + return os.Remove(TestSystemRulesFile) +} + +func TestRefreshableFileDataSource_ReadSource(t *testing.T) { + t.Run("RefreshableFileDataSource_ReadSource_Nil", func(t *testing.T) { + err := prepareSystemRulesTestFile() + if err != nil { + t.Errorf("Fail to prepare test file, err: %+v", err) + } + + s := &RefreshableFileDataSource{ + sourceFilePath: TestSystemRulesFile + "NotExisted", + } + got, err := s.ReadSource() + assert.True(t, got == nil && err != nil && strings.Contains(err.Error(), "RefreshableFileDataSource fail to open the property file")) + + err = deleteSystemRulesTestFile() + if err != nil { + t.Errorf("Fail to delete test file, err: %+v", err) + } + }) + + t.Run("RefreshableFileDataSource_ReadSource_Normal", func(t *testing.T) { + err := prepareSystemRulesTestFile() + if err != nil { + t.Errorf("Fail to prepare test file, err: %+v", err) + } + + s := &RefreshableFileDataSource{ + sourceFilePath: TestSystemRulesFile, + } + got, err := s.ReadSource() + if err != nil { + t.Errorf("Fail to execute ReadSource, err: %+v", err) + } + assert.True(t, reflect.DeepEqual(got, []byte(TestSystemRules))) + + err = deleteSystemRulesTestFile() + if err != nil { + t.Errorf("Fail to delete test file, err: %+v", err) + } + }) +} + +func TestRefreshableFileDataSource_doReadAndUpdate(t *testing.T) { + t.Run("TestRefreshableFileDataSource_doReadAndUpdate_normal", func(t *testing.T) { + err := prepareSystemRulesTestFile() + if err != nil { + t.Errorf("Fail to prepare test file, err: %+v", err) + } + + s := &RefreshableFileDataSource{ + sourceFilePath: TestSystemRulesFile, + closeChan: make(chan struct{}), + } + mh1 := &datasource.MockPropertyHandler{} + mh1.On("Handle", tmock.Anything).Return(nil) + mh1.On("isPropertyConsistent", tmock.Anything).Return(false) + s.AddPropertyHandler(mh1) + + err = s.doReadAndUpdate() + assert.True(t, err == nil, "Fail to doReadAndUpdate.") + + err = deleteSystemRulesTestFile() + if err != nil { + t.Errorf("Fail to delete test file, err: %+v", err) + } + }) + + t.Run("TestRefreshableFileDataSource_doReadAndUpdate_Handler_err", func(t *testing.T) { + err := prepareSystemRulesTestFile() + if err != nil { + t.Errorf("Fail to prepare test file, err: %+v", err) + } + + s := &RefreshableFileDataSource{ + sourceFilePath: TestSystemRulesFile, + closeChan: make(chan struct{}), + } + mh1 := &datasource.MockPropertyHandler{} + hErr := errors.New("Handle error") + mh1.On("Handle", tmock.Anything).Return(hErr) + mh1.On("isPropertyConsistent", tmock.Anything).Return(false) + s.AddPropertyHandler(mh1) + + err = s.doReadAndUpdate() + assert.True(t, err != nil && strings.Contains(err.Error(), hErr.Error()), "Fail to doReadAndUpdate.") + + err = deleteSystemRulesTestFile() + if err != nil { + t.Errorf("Fail to delete test file, err: %+v", err) + } + }) +} + +func TestRefreshableFileDataSource_Close(t *testing.T) { + t.Run("TestRefreshableFileDataSource_Close", func(t *testing.T) { + err := prepareSystemRulesTestFile() + if err != nil { + t.Errorf("Fail to prepare test file, err: %+v", err) + } + + s := &RefreshableFileDataSource{ + sourceFilePath: TestSystemRulesFile, + closeChan: make(chan struct{}), + } + mh1 := &datasource.MockPropertyHandler{} + mh1.On("Handle", tmock.Anything).Return(nil) + mh1.On("isPropertyConsistent", tmock.Anything).Return(false) + s.AddPropertyHandler(mh1) + + err = s.Initialize() + if err != nil { + t.Errorf("Fail to Initialize datasource, err: %+v", err) + } + + time.Sleep(1 * time.Second) + s.Close() + time.Sleep(1 * time.Second) + e := s.watcher.Add(TestSystemRulesFile) + assert.True(t, e != nil && strings.Contains(e.Error(), "closed")) + + err = deleteSystemRulesTestFile() + if err != nil { + t.Errorf("Fail to delete test file, err: %+v", err) + } + }) +} + +func TestNewFileDataSource_ALL_For_SystemRule(t *testing.T) { + t.Run("TestNewFileDataSource_ALL_For_SystemRule_Write_Event", func(t *testing.T) { + err := prepareSystemRulesTestFile() + if err != nil { + t.Errorf("Fail to prepare test file, err: %+v", err) + } + + mh1 := &datasource.MockPropertyHandler{} + mh1.On("Handle", tmock.Anything).Return(nil) + mh1.On("isPropertyConsistent", tmock.Anything).Return(false) + + ds := NewFileDataSource(TestSystemRulesFile, mh1) + err = ds.Initialize() + if err != nil { + t.Errorf("Fail to initialize the file data source, err: %+v", err) + } + mh1.AssertNumberOfCalls(t, "Handle", 1) + + f, err := os.OpenFile(ds.sourceFilePath, os.O_RDWR|os.O_APPEND|os.O_SYNC, os.ModePerm) + if err != nil { + t.Errorf("Fail to open the property file, err: %+v.", err) + } + defer f.Close() + + f.WriteString("\n" + TestSystemRules) + f.Sync() + time.Sleep(3 * time.Second) + mh1.AssertNumberOfCalls(t, "Handle", 2) + + ds.Close() + time.Sleep(1 * time.Second) + e := ds.watcher.Add(TestSystemRulesFile) + assert.True(t, e != nil && strings.Contains(e.Error(), "closed")) + + err = deleteSystemRulesTestFile() + if err != nil { + t.Errorf("Fail to delete test file, err: %+v", err) + } + }) + + t.Run("TestNewFileDataSource_ALL_For_SystemRule_Remove_Event", func(t *testing.T) { + err := prepareSystemRulesTestFile() + if err != nil { + t.Errorf("Fail to prepare test file, err: %+v", err) + } + + mh1 := &datasource.MockPropertyHandler{} + mh1.On("Handle", tmock.Anything).Return(nil) + mh1.On("isPropertyConsistent", tmock.Anything).Return(false) + + ds := NewFileDataSource(TestSystemRulesFile, mh1) + err = ds.Initialize() + if err != nil { + t.Errorf("Fail to initialize the file data source, err: %+v", err) + } + mh1.AssertNumberOfCalls(t, "Handle", 1) + + err = deleteSystemRulesTestFile() + if err != nil { + t.Errorf("Fail to delete test file, err: %+v", err) + } + + time.Sleep(3 * time.Second) + mh1.AssertNumberOfCalls(t, "Handle", 2) + + ds.Close() + time.Sleep(1 * time.Second) + e := ds.watcher.Add(TestSystemRulesFile) + assert.True(t, e != nil && strings.Contains(e.Error(), "closed")) + }) + +} diff --git a/ext/datasource/mock.go b/ext/datasource/mock.go new file mode 100644 index 000000000..4c5eddf50 --- /dev/null +++ b/ext/datasource/mock.go @@ -0,0 +1,17 @@ +package datasource + +import "github.com/stretchr/testify/mock" + +type MockPropertyHandler struct { + mock.Mock +} + +func (m *MockPropertyHandler) isPropertyConsistent(src interface{}) bool { + args := m.Called(src) + return args.Bool(0) +} + +func (m *MockPropertyHandler) Handle(src []byte) error { + args := m.Called(src) + return args.Error(0) +} diff --git a/ext/datasource/property_test.go b/ext/datasource/property_test.go index 89ab519f8..9bafc348e 100644 --- a/ext/datasource/property_test.go +++ b/ext/datasource/property_test.go @@ -2,8 +2,8 @@ package datasource import ( "encoding/json" - "errors" "github.com/alibaba/sentinel-golang/core/system" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "io/ioutil" "testing" diff --git a/go.mod b/go.mod index 16f4f05d6..fe20eb305 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.13 require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/apache/dubbo-go v0.1.2-0.20200224151332-dd1a3c24d656 + github.com/fsnotify/fsnotify v1.4.7 github.com/gin-gonic/gin v1.5.0 github.com/go-ole/go-ole v1.2.4 // indirect github.com/pkg/errors v0.8.1 diff --git a/go.sum b/go.sum index c996c6b4f..dc5f5f24f 100644 --- a/go.sum +++ b/go.sum @@ -77,6 +77,7 @@ github.com/envoyproxy/protoc-gen-validate v0.0.14/go.mod h1:iSmxcyjqTsJpI2R4NaDN github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239/go.mod h1:Gdwt2ce0yfBxPvZrHkprdPPTTS3N5rwmLE8T22KBXlw= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/structs v0.0.0-20180123065059-ebf56d35bba7/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=