-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Use fsnotify to detect model/policy files change in casbin plugin #614
Changes from 3 commits
66d0cb7
fe5182e
1bfb1d4
3120753
7bf2834
d0dc175
17ed368
99bf29d
57329f4
6eb6b03
5c99c69
846ff5b
f330ec6
29822b8
a7bf59d
6719586
daf2c24
7120888
c2ffc2e
46d90e6
2b01940
c1522d2
c7cdd4a
4c686e2
cc84ebd
5e32bea
930b568
1eaa4e7
e34511d
7a1551f
bbefd66
f239cf5
75d00da
b9d7d06
0b03da5
8ca375a
363dd1d
b81b325
c879870
ac85bd9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -15,11 +15,13 @@ | |||
package file | ||||
|
||||
import ( | ||||
"errors" | ||||
"fmt" | ||||
"os" | ||||
"sync" | ||||
"time" | ||||
|
||||
"github.com/jellydator/ttlcache/v3" | ||||
"github.com/fsnotify/fsnotify" | ||||
|
||||
"mosn.io/htnn/api/pkg/log" | ||||
) | ||||
|
@@ -48,65 +50,87 @@ | |||
f.lock.Unlock() | ||||
} | ||||
|
||||
type fs struct { | ||||
cache *ttlcache.Cache[string, os.FileInfo] | ||||
type Fsnotify struct { | ||||
Watcher *fsnotify.Watcher | ||||
} | ||||
|
||||
func newFS(ttl time.Duration) *fs { | ||||
loader := ttlcache.LoaderFunc[string, os.FileInfo]( | ||||
func(c *ttlcache.Cache[string, os.FileInfo], key string) *ttlcache.Item[string, os.FileInfo] { | ||||
info, err := os.Stat(key) | ||||
if err != nil { | ||||
logger.Error(err, "reload file info to cache", "file", key) | ||||
return nil | ||||
} | ||||
item := c.Set(key, info, ttlcache.DefaultTTL) | ||||
logger.Info("update file mtime", "file", key, "mtime", item.Value().ModTime()) | ||||
return item | ||||
}, | ||||
) | ||||
cache := ttlcache.New( | ||||
ttlcache.WithTTL[string, os.FileInfo](ttl), | ||||
ttlcache.WithLoader[string, os.FileInfo](loader), | ||||
) | ||||
go cache.Start() | ||||
|
||||
return &fs{ | ||||
cache: cache, | ||||
func newFsnotify() (fs *Fsnotify) { | ||||
watcher, err := fsnotify.NewWatcher() | ||||
if err != nil { | ||||
logger.Error(err, "create watcher failed") | ||||
return | ||||
} | ||||
fs = &Fsnotify{ | ||||
Watcher: watcher, | ||||
} | ||||
return | ||||
} | ||||
|
||||
var ( | ||||
// TODO: rewrite it to use inotify | ||||
defaultFs = newFS(10 * time.Second) | ||||
defaultFsnotify = newFsnotify() | ||||
) | ||||
|
||||
func IsChanged(files ...*File) bool { | ||||
func Update(onChange func(), files ...*File) (err error) { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Update is redundant, use WatchFiles is enough. |
||||
err = WatchFiles(onChange, files...) | ||||
return | ||||
} | ||||
|
||||
func WatchFiles(onChange func(), files ...*File) (err error) { | ||||
if len(files) < 1 { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about using param |
||||
err = errors.New("must specify at least one file to watch") | ||||
logger.Error(err, "") | ||||
return | ||||
} | ||||
|
||||
watcher := newFsnotify().Watcher | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't we use defaultFsnotify? |
||||
if err != nil { | ||||
|
||||
spacewander marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
logger.Error(err, "failed to create watcher") | ||||
return | ||||
} | ||||
|
||||
// Add files to watcher. | ||||
for _, file := range files { | ||||
changed := defaultFs.isChanged(file) | ||||
if changed { | ||||
return true | ||||
} | ||||
go defaultFsnotify.watchFiles(onChange, watcher, file) | ||||
} | ||||
return false | ||||
|
||||
return | ||||
spacewander marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
} | ||||
|
||||
func (f *fs) isChanged(file *File) bool { | ||||
item := f.cache.Get(file.Name) | ||||
if item == nil { | ||||
// As a protection, failed to fetch the real file means file not changed | ||||
return false | ||||
func (f *Fsnotify) watchFiles(onChange func(), w *fsnotify.Watcher, files *File) { | ||||
defer func(w *fsnotify.Watcher) { | ||||
err := w.Close() | ||||
if err != nil { | ||||
logger.Error(err, "failed to close fsnotify watcher") | ||||
} | ||||
}(w) | ||||
err := w.Add(files.Name) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider a case: people remove the file and then create a new file with the same name. The fsnotify won't be triggered in this case. What about watching the file's directory and filtering according to the name? |
||||
if err != nil { | ||||
logger.Error(err, "add file to watcher failed") | ||||
} | ||||
for { | ||||
spacewander marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
select { | ||||
case event, ok := <-w.Events: | ||||
if !ok { | ||||
return | ||||
} | ||||
logger.Info(fmt.Sprintf("event: %v", event)) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just put the event as an argument is enough, we don't need to use Sprintf There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As the error message indicated, the arguments should be key-value pairs, which have even numbers, for example: htnn/api/pkg/plugins/plugins.go Line 73 in 1d5db16
|
||||
onChange() | ||||
return | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should keep watching, and should not exit after the first event. This is a waste of goroutine and watcher, and force the user to rewatch each time. |
||||
case err, ok := <-w.Errors: | ||||
if !ok { | ||||
return | ||||
} | ||||
logger.Error(err, "error watching files") | ||||
} | ||||
} | ||||
|
||||
return file.Mtime().Before(item.Value().ModTime()) | ||||
} | ||||
|
||||
func (f *fs) Stat(path string) (*File, error) { | ||||
func (f *Fsnotify) Stat(path string) (*File, error) { | ||||
info, err := os.Stat(path) | ||||
if err != nil { | ||||
return nil, err | ||||
} | ||||
f.cache.Set(path, info, ttlcache.DefaultTTL) | ||||
|
||||
return &File{ | ||||
Name: path, | ||||
|
@@ -115,24 +139,5 @@ | |||
} | ||||
|
||||
func Stat(path string) (*File, error) { | ||||
return defaultFs.Stat(path) | ||||
} | ||||
|
||||
func Update(files ...*File) bool { | ||||
for _, file := range files { | ||||
if !defaultFs.update(file) { | ||||
return false | ||||
} | ||||
} | ||||
return true | ||||
} | ||||
|
||||
func (f *fs) update(file *File) bool { | ||||
item := f.cache.Get(file.Name) | ||||
if item == nil { | ||||
return false | ||||
} | ||||
|
||||
file.SetMtime(item.Value().ModTime()) | ||||
return true | ||||
return defaultFsnotify.Stat(path) | ||||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -16,28 +16,45 @@ package file | |||||
|
||||||
import ( | ||||||
"os" | ||||||
"sync" | ||||||
"testing" | ||||||
"time" | ||||||
|
||||||
"github.com/stretchr/testify/assert" | ||||||
) | ||||||
|
||||||
func TestFileMtimeDetection(t *testing.T) { | ||||||
defaultFs = newFS(2000 * time.Millisecond) | ||||||
func TestFileIsChanged(t *testing.T) { | ||||||
var mu sync.Mutex | ||||||
i := 1 | ||||||
tmpfile, _ := os.CreateTemp("./", "example") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
would be better so that the test case won't pollute the git repo. |
||||||
defer func(name string) { | ||||||
err := os.Remove(name) | ||||||
if err != nil { | ||||||
t.Logf("%v", err) | ||||||
} | ||||||
}(tmpfile.Name()) | ||||||
|
||||||
tmpfile, _ := os.CreateTemp("", "example") | ||||||
defer os.Remove(tmpfile.Name()) // clean up | ||||||
|
||||||
f, err := Stat(tmpfile.Name()) | ||||||
assert.Nil(t, err) | ||||||
assert.False(t, IsChanged(f)) | ||||||
time.Sleep(1000 * time.Millisecond) | ||||||
file := &File{Name: tmpfile.Name()} | ||||||
_ = WatchFiles(func() { | ||||||
mu.Lock() | ||||||
i = 2 | ||||||
mu.Unlock() | ||||||
}, file) | ||||||
time.Sleep(1 * time.Millisecond) | ||||||
tmpfile.Write([]byte("bls")) | ||||||
tmpfile.Close() | ||||||
assert.False(t, IsChanged(f)) | ||||||
tmpfile.Sync() | ||||||
mu.Lock() | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the lock here guarantee the callback in WatchFiles run before assertion? The assertion may get the lock first. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The lock can be removed? |
||||||
assert.Equal(t, 2, i) | ||||||
mu.Unlock() | ||||||
|
||||||
time.Sleep(2500 * time.Millisecond) | ||||||
assert.True(t, IsChanged(f)) | ||||||
assert.True(t, Update(f)) | ||||||
assert.False(t, IsChanged(f)) | ||||||
_ = WatchFiles(func() { | ||||||
mu.Lock() | ||||||
i = 1 | ||||||
mu.Unlock() | ||||||
}, file) | ||||||
time.Sleep(1 * time.Millisecond) | ||||||
tmpfile.Sync() | ||||||
mu.Lock() | ||||||
assert.Equal(t, 2, i) | ||||||
mu.Unlock() | ||||||
} |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -35,13 +35,9 @@ | |||
config *config | ||||
} | ||||
|
||||
func (f *filter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) api.ResultAction { | ||||
func (f *filter) reloadEnforcer() { | ||||
conf := f.config | ||||
role, _ := headers.Get(conf.Token.Name) // role can be "" | ||||
url := headers.Url() | ||||
|
||||
policyChanged := file.IsChanged(conf.modelFile, conf.policyFile) | ||||
if policyChanged && !conf.updating.Load() { | ||||
if !conf.updating.Load() { | ||||
conf.updating.Store(true) | ||||
api.LogWarnf("policy %s or model %s changed, reload enforcer", conf.policyFile.Name, conf.modelFile.Name) | ||||
|
||||
|
@@ -58,11 +54,28 @@ | |||
conf.enforcer = e | ||||
conf.lock.Unlock() | ||||
|
||||
file.Update(conf.modelFile, conf.policyFile) | ||||
err = file.Update(func() { | ||||
f.reloadEnforcer() | ||||
}, conf.modelFile, conf.policyFile) | ||||
if err != nil { | ||||
api.LogErrorf("failed to update Enforcer: %v", err) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The msg is not precise. The code above doesn't update Enforcer |
||||
} | ||||
api.LogWarnf("policy %s or model %s changed, enforcer reloaded", conf.policyFile.Name, conf.modelFile.Name) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This log should move up, just after we change the enforcer |
||||
} | ||||
}() | ||||
} | ||||
} | ||||
func (f *filter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) api.ResultAction { | ||||
conf := f.config | ||||
role, _ := headers.Get(conf.Token.Name) // role can be "" | ||||
url := headers.Url() | ||||
|
||||
err := file.WatchFiles(f.reloadEnforcer, conf.modelFile, conf.policyFile) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should add file to watcher when init the conf: htnn/plugins/plugins/casbin/config.go Line 56 in e2cbf68
The DecodeHeaders is called per req. |
||||
|
||||
if err != nil { | ||||
api.LogErrorf("failed to watch files: %v", err) | ||||
return &api.LocalResponse{Code: 500} | ||||
} | ||||
|
||||
conf.lock.RLock() | ||||
ok, err := f.config.enforcer.Enforce(role, url.Path, headers.Method()) | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this return?