Skip to content

Commit

Permalink
Add refreshable file data-source implementation (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
louyuting committed Mar 18, 2020
1 parent 9e57090 commit f25ccc1
Show file tree
Hide file tree
Showing 8 changed files with 486 additions and 1 deletion.
36 changes: 36 additions & 0 deletions ext/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,39 @@ type DataSource interface {
// Close the data source.
io.Closer
}

type Base struct {
handlers []PropertyHandler
}

func (b *Base) Handlers() []PropertyHandler {
return b.handlers
}

// return idx if existed, else return -1
func (b *Base) indexOfHandler(h PropertyHandler) int {
for idx, handler := range b.handlers {
if handler == h {
return idx
}
}
return -1
}

func (b *Base) AddPropertyHandler(h PropertyHandler) {
if h == nil || b.indexOfHandler(h) >= 0 {
return
}
b.handlers = append(b.handlers, h)
}

func (b *Base) RemovePropertyHandler(h PropertyHandler) {
if h == nil {
return
}
idx := b.indexOfHandler(h)
if idx < 0 {
return
}
b.handlers = append(b.handlers[:idx], b.handlers[idx+1:]...)
}
64 changes: 64 additions & 0 deletions ext/datasource/datasource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package datasource

import (
"github.com/stretchr/testify/assert"
"reflect"
"testing"
)

func TestBase_AddPropertyHandler(t *testing.T) {
t.Run("AddPropertyHandler_nil", func(t *testing.T) {
b := &Base{
handlers: make([]PropertyHandler, 0),
}
b.AddPropertyHandler(nil)
assert.True(t, len(b.handlers) == 0, "Fail to execute the case TestBase_AddPropertyHandler.")
})

t.Run("AddPropertyHandler_Normal", func(t *testing.T) {
b := &Base{
handlers: make([]PropertyHandler, 0),
}
h := &DefaultPropertyHandler{}
b.AddPropertyHandler(h)
assert.True(t, len(b.handlers) == 1 && reflect.DeepEqual(b.handlers[0], h), "Fail to execute the case TestBase_AddPropertyHandler.")
})
}

func TestBase_RemovePropertyHandler(t *testing.T) {
t.Run("TestBase_RemovePropertyHandler_nil", func(t *testing.T) {
b := &Base{
handlers: make([]PropertyHandler, 0),
}
h1 := &DefaultPropertyHandler{}
b.handlers = append(b.handlers, h1)
b.RemovePropertyHandler(nil)
assert.True(t, len(b.handlers) == 1, "The case TestBase_RemovePropertyHandler execute failed.")
})

t.Run("TestBase_RemovePropertyHandler", func(t *testing.T) {
b := &Base{
handlers: make([]PropertyHandler, 0),
}
h1 := &DefaultPropertyHandler{}
b.handlers = append(b.handlers, h1)
b.RemovePropertyHandler(h1)
assert.True(t, len(b.handlers) == 0, "The case TestBase_RemovePropertyHandler execute failed.")
})
}

func TestBase_indexOfHandler(t *testing.T) {
t.Run("TestBase_indexOfHandler", func(t *testing.T) {
b := &Base{
handlers: make([]PropertyHandler, 0),
}
h1 := &DefaultPropertyHandler{}
b.handlers = append(b.handlers, h1)
h2 := &DefaultPropertyHandler{}
b.handlers = append(b.handlers, h2)
h3 := &DefaultPropertyHandler{}
b.handlers = append(b.handlers, h3)

assert.True(t, b.indexOfHandler(h2) == 1, "Fail to execute the case TestBase_indexOfHandler.")
})
}
119 changes: 119 additions & 0 deletions ext/datasource/file/refreshable_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package file

import (
"github.com/alibaba/sentinel-golang/ext/datasource"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"io/ioutil"
"os"
)

var (
logger = logging.GetDefaultLogger()
)

type RefreshableFileDataSource struct {
datasource.Base
sourceFilePath string
isInitialized util.AtomicBool
closeChan chan struct{}
watcher *fsnotify.Watcher
}

func NewFileDataSource(sourceFilePath string, handlers ...datasource.PropertyHandler) *RefreshableFileDataSource {
var ds = &RefreshableFileDataSource{
sourceFilePath: sourceFilePath,
closeChan: make(chan struct{}),
}
for _, h := range handlers {
ds.AddPropertyHandler(h)
}
return ds
}

func (s *RefreshableFileDataSource) ReadSource() ([]byte, error) {
f, err := os.Open(s.sourceFilePath)
if err != nil {
return nil, errors.Errorf("RefreshableFileDataSource fail to open the property file, err: %+v.", err)
}
defer f.Close()

src, err := ioutil.ReadAll(f)
if err != nil {
return nil, errors.Errorf("RefreshableFileDataSource fail to read file, err: %+v.", err)
}
return src, nil
}

func (s *RefreshableFileDataSource) Initialize() error {
if !s.isInitialized.CompareAndSet(false, true) {
return nil
}

err := s.doReadAndUpdate()
if err != nil {
logger.Errorf("Fail to execute doReadAndUpdate, err: %+v", err)
}

w, err := fsnotify.NewWatcher()
if err != nil {
return errors.Errorf("Fail to new a watcher instance of fsnotify, err: %+v", err)
}
err = w.Add(s.sourceFilePath)
if err != nil {
return errors.Errorf("Fail add a watcher on file[%s], err: %+v", s.sourceFilePath, err)
}
s.watcher = w

go util.RunWithRecover(func() {
defer s.watcher.Close()
for {
select {
case ev := <-s.watcher.Events:
if ev.Op&fsnotify.Write == fsnotify.Write {
err := s.doReadAndUpdate()
if err != nil {
logger.Errorf("Fail to execute doReadAndUpdate, err: %+v", err)
}
}

if ev.Op&fsnotify.Remove == fsnotify.Remove || ev.Op&fsnotify.Rename == fsnotify.Rename {
logger.Errorf("The file source[%s] was removed or renamed.", s.sourceFilePath)
for _, h := range s.Handlers() {
err := h.Handle(nil)
if err != nil {
logger.Errorf("RefreshableFileDataSource fail to publish property, err: %+v.", err)
}
}
}
case err := <-s.watcher.Errors:
logger.Errorf("Watch err on file[%s], err: %+v", s.sourceFilePath, err)
case <-s.closeChan:
return
}
}
}, logger)
return nil
}

func (s *RefreshableFileDataSource) doReadAndUpdate() error {
src, err := s.ReadSource()
if err != nil {
return errors.Errorf("Fail to read source, err: %+v", err)
}
for _, h := range s.Handlers() {
err := h.Handle(src)
if err != nil {
return errors.Errorf("RefreshableFileDataSource fail to publish property, err: %+v.", err)
}
}
return nil
}

func (s *RefreshableFileDataSource) Close() error {
s.closeChan <- struct{}{}
logger.Infof("The RefreshableFileDataSource for [%s] had been closed.", s.sourceFilePath)
return nil
}
Loading

0 comments on commit f25ccc1

Please sign in to comment.