Skip to content

Commit

Permalink
fix: watcher logic
Browse files Browse the repository at this point in the history
  • Loading branch information
hui.wang committed Jan 6, 2022
1 parent 987bbf2 commit ae1416c
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 60 deletions.
20 changes: 11 additions & 9 deletions kv/xetcd/xetcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,16 @@ func (p *Loader) WatchImplement(ctx context.Context, confPath string, onContentC
wc := clientv3.NewWatcher(p.cli)
defer func() { _ = wc.Close() }()
watchChan := wc.Watch(ctx, confPath, clientv3.WithPrefix())
for resp := range watchChan {
select {
case <-p.Done:
return
default:
go func(pin *Loader, oc kv.ContentChange) {
for resp := range watchChan {
select {
case <-pin.Done:
return
default:
}
for _, ev := range resp.Events {
oc(pin.Name(), (string)(ev.Kv.Key), ev.Kv.Value)
}
}
for _, ev := range resp.Events {
onContentChange(p.Name(), (string)(ev.Kv.Key), ev.Kv.Value)
}
}
}(p, onContentChange)
}
52 changes: 27 additions & 25 deletions kv/xfile/xfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 +37,37 @@ func (p *Loader) GetImplement(ctx context.Context, confPath string) ([]byte, err

// WatchImplement 实现common.loaderImplement.WatchImplement
func (p *Loader) WatchImplement(ctx context.Context, confPath string, onContentChange kv.ContentChange) {
watched := false
for {
select {
case <-p.Done:
return
default:
}
if !watched {
if err := p.watcher.Add(confPath); err != nil {
p.CC.OnWatchError(p.Name(), confPath, err)
go func(pin *Loader, oc kv.ContentChange) {
watched := false
for {
select {
case <-pin.Done:
return
default:
}
}
select {
case event := <-p.watcher.Events:
if (event.Op&fsnotify.Write) == fsnotify.Write || (event.Op&fsnotify.Create) == fsnotify.Create {
confPathChanged := strings.ReplaceAll(event.Name, "\\", "/")
if b, err := p.Get(ctx, confPathChanged); err == nil {
if p.IsChanged(confPathChanged, b) {
onContentChange(p.Name(), confPathChanged, b)
}
if !watched {
if err := pin.watcher.Add(confPath); err != nil {
pin.CC.OnWatchError(pin.Name(), confPath, err)
}
}
case err := <-p.watcher.Errors:
select {
case <-p.Done:
return
default:
case event := <-pin.watcher.Events:
if (event.Op&fsnotify.Write) == fsnotify.Write || (event.Op&fsnotify.Create) == fsnotify.Create {
confPathChanged := strings.ReplaceAll(event.Name, "\\", "/")
if b, err := pin.Get(ctx, confPathChanged); err == nil {
if pin.IsChanged(confPathChanged, b) {
oc(pin.Name(), confPathChanged, b)
}
}
}
case err := <-pin.watcher.Errors:
select {
case <-pin.Done:
return
default:
}
pin.CC.OnWatchError(pin.Name(), confPath, err)
}
p.CC.OnWatchError(p.Name(), confPath, err)
}
}
}(p, onContentChange)
}
16 changes: 7 additions & 9 deletions kv/xmem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ import (
// Loader mem Loader
type Loader struct {
*kv.Common
dataMutex sync.Mutex
data map[string][]byte
onContentChange kv.ContentChange
confPath string
dataMutex sync.Mutex
data map[string][]byte
onChanged map[string]kv.ContentChange
}

// New new mem Loader
func New(opts ...kv.Option) (p *Loader, err error) {
x := &Loader{data: make(map[string][]byte)}
x := &Loader{data: make(map[string][]byte), onChanged: make(map[string]kv.ContentChange)}
x.Common = kv.New("mem", x, opts...)
return x, nil
}
Expand All @@ -42,8 +41,7 @@ func (p *Loader) GetImplement(ctx context.Context, confPath string) ([]byte, err
func (p *Loader) WatchImplement(ctx context.Context, confPath string, onContentChange kv.ContentChange) {
p.dataMutex.Lock()
defer p.dataMutex.Unlock()
p.onContentChange = onContentChange
p.confPath = confPath
p.onChanged[confPath] = onContentChange
}

// Set 设定数据
Expand All @@ -55,7 +53,7 @@ func (p *Loader) Set(confPath string, data []byte) {
return
}
p.data[confPath] = data
if confPath == p.confPath && p.onContentChange != nil {
p.onContentChange(p.Name(), confPath, data)
if f, ok := p.onChanged[confPath]; ok {
f(p.Name(), confPath, data)
}
}
1 change: 0 additions & 1 deletion tests/all_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ func TestOverideDefaultValue(t *testing.T) {
Convey("with flag support", t, func(c C) {
defaultVal := NewTestConfig()
x := xconf.New(
xconf.WithDebug(true),
xconf.WithReaders(bytes.NewReader([]byte(yamlTest))),
xconf.WithFlagArgs("--http_address=192.168.0.1", "--int64_slice=100,101", "--sub_test.map_not_leaf=k2,2222", "--sub_test.map2=k3,3333"),
xconf.WithEnviron("read_timeout=20s", "map_not_leaf=k3,3333"),
Expand Down
2 changes: 1 addition & 1 deletion xconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func (x *XConf) DumpInfo() {
)
xf := xflag.NewMaker(opts...)
if err := xf.Set(x.zeroValPtrForLayout); err != nil {
fmt.Printf(fmt.Sprintf("got error while xflag Set, err :%s ", err.Error()))
fmt.Printf("got error while xflag Set, err :%s\n", err.Error())
return
}

Expand Down
18 changes: 3 additions & 15 deletions xconf_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"path/filepath"
"time"

"github.com/sandwich-go/xconf/kv"
)
Expand All @@ -25,17 +24,6 @@ func (x *XConf) WatchFieldPath(fieldPath string, changed OnFieldUpdated) {
x.mapOnFieldUpdated[fieldPath] = changed
}

func (x *XConf) autoRecover(tag string, f func()) {
defer func() {
if reason := recover(); reason != nil {
x.cc.LogWarning(fmt.Sprintf("autoRecover %s panic with reason:%v retry after 5 second", tag, reason))
time.Sleep(time.Second * 5)
go x.autoRecover(tag, f)
}
}()
f()
}

// WatchUpdate confPath不会自动绑定env value,如果需要watch的路径与环境变量相关,先通过ParseEnvValue自行解析替换处理错误
func (x *XConf) WatchUpdate(confPath string, loader kv.Loader) {
k := &kvLoader{
Expand All @@ -50,9 +38,8 @@ func (x *XConf) WatchUpdate(confPath string, loader kv.Loader) {
x.cc.LogWarning(fmt.Sprintf("name:%s confPath:%s watch got error:%s", name, confPath, err))
})
}
go x.autoRecover(fmt.Sprintf("watch_with_%s_%s"+loader.Name(), confPath), func() {
k.Watch(context.TODO(), confPath, x.onContentChanged)
})
// 需要Loader自行维护异步逻辑
k.Watch(context.TODO(), k.confPath, x.onContentChanged)
}

func (x *XConf) notifyChanged() error {
Expand Down Expand Up @@ -81,6 +68,7 @@ func (x *XConf) notifyChanged() error {
func (x *XConf) onContentChanged(name string, confPath string, content []byte) {
x.dynamicUpdate.Lock()
defer x.dynamicUpdate.Unlock()

x.cc.LogDebug(fmt.Sprintf("got update:%s", confPath))
defer func() {
if reason := recover(); reason == nil {
Expand Down

0 comments on commit ae1416c

Please sign in to comment.