From 9348813f622c1f81f69bc93ca23477026385cf1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20K=C3=A4ldstr=C3=B6m?= Date: Tue, 30 Jul 2019 16:42:37 +0300 Subject: [PATCH 1/7] Move monitor to pkg/util/sync --- pkg/storage/sync/storage.go | 6 +++--- pkg/storage/watch/storage.go | 6 +++--- pkg/storage/watch/watch.go | 10 +++++----- pkg/util/{ => sync}/monitor.go | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) rename pkg/util/{ => sync}/monitor.go (96%) diff --git a/pkg/storage/sync/storage.go b/pkg/storage/sync/storage.go index 57091a09b..63dd267f7 100644 --- a/pkg/storage/sync/storage.go +++ b/pkg/storage/sync/storage.go @@ -9,7 +9,7 @@ import ( "github.com/weaveworks/ignite/pkg/storage" "github.com/weaveworks/ignite/pkg/storage/watch" "github.com/weaveworks/ignite/pkg/storage/watch/update" - "github.com/weaveworks/ignite/pkg/util" + "github.com/weaveworks/ignite/pkg/util/sync" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -28,7 +28,7 @@ type SyncStorage struct { storages []storage.Storage eventStream watch.AssociatedEventStream updateStream UpdateStream - monitor *util.Monitor + monitor *sync.Monitor } var _ storage.Storage = &SyncStorage{} @@ -48,7 +48,7 @@ func NewSyncStorage(rwStorage storage.Storage, wStorages ...storage.Storage) sto } if ss.eventStream != nil { - ss.monitor = util.RunMonitor(ss.monitorFunc) + ss.monitor = sync.RunMonitor(ss.monitorFunc) } return ss diff --git a/pkg/storage/watch/storage.go b/pkg/storage/watch/storage.go index 48867b61f..e3da85043 100644 --- a/pkg/storage/watch/storage.go +++ b/pkg/storage/watch/storage.go @@ -11,7 +11,7 @@ import ( "github.com/weaveworks/ignite/pkg/storage" "github.com/weaveworks/ignite/pkg/storage/manifest/raw" "github.com/weaveworks/ignite/pkg/storage/watch/update" - "github.com/weaveworks/ignite/pkg/util" + "github.com/weaveworks/ignite/pkg/util/sync" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/yaml" @@ -45,7 +45,7 @@ func NewGenericWatchStorage(storage storage.Storage) (WatchStorage, error) { } if mapped, ok := s.RawStorage().(raw.MappedRawStorage); ok { - s.monitor = util.RunMonitor(func() { + s.monitor = sync.RunMonitor(func() { s.monitorFunc(mapped, files) // Offload the file registration to the goroutine }) } @@ -58,7 +58,7 @@ type GenericWatchStorage struct { storage.Storage watcher *watcher events *AssociatedEventStream - monitor *util.Monitor + monitor *sync.Monitor } var _ WatchStorage = &GenericWatchStorage{} diff --git a/pkg/storage/watch/watch.go b/pkg/storage/watch/watch.go index ca8dac3e1..0f43e1455 100644 --- a/pkg/storage/watch/watch.go +++ b/pkg/storage/watch/watch.go @@ -10,7 +10,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/weaveworks/ignite/pkg/storage" "github.com/weaveworks/ignite/pkg/storage/watch/update" - "github.com/weaveworks/ignite/pkg/util" + "github.com/weaveworks/ignite/pkg/util/sync" ) const eventBuffer = 4096 // How many events and updates we can buffer before watching is interrupted @@ -62,8 +62,8 @@ type watcher struct { updates UpdateStream watches watches suspendEvent update.Event - monitor *util.Monitor - dispatcher *util.Monitor + monitor *sync.Monitor + dispatcher *sync.Monitor // the batcher is used for properly sending many concurrent inotify events // as a group, after a specified timeout. This fixes the issue of one single // file operation being registered as many different inotify events @@ -111,8 +111,8 @@ func newWatcher(dir string) (w *watcher, files []string, err error) { if err = w.start(&files); err != nil { notify.Stop(w.events) } else { - w.monitor = util.RunMonitor(w.monitorFunc) - w.dispatcher = util.RunMonitor(w.dispatchFunc) + w.monitor = sync.RunMonitor(w.monitorFunc) + w.dispatcher = sync.RunMonitor(w.dispatchFunc) } return diff --git a/pkg/util/monitor.go b/pkg/util/sync/monitor.go similarity index 96% rename from pkg/util/monitor.go rename to pkg/util/sync/monitor.go index dd0149baa..f09c55ca4 100644 --- a/pkg/util/monitor.go +++ b/pkg/util/sync/monitor.go @@ -1,4 +1,4 @@ -package util +package sync import "sync" From 14345e859cd41cdda9b20a679f1c13bdcbb42714 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20K=C3=A4ldstr=C3=B6m?= Date: Tue, 30 Jul 2019 17:02:15 +0300 Subject: [PATCH 2/7] Rename/move watch.Batcher to sync.WriteBatcher --- pkg/storage/watch/batcher.go | 69 --------------- pkg/storage/watch/watch.go | 4 +- pkg/util/sync/batcher.go | 85 +++++++++++++++++++ .../watch => util/sync}/batcher_test.go | 6 +- 4 files changed, 90 insertions(+), 74 deletions(-) delete mode 100644 pkg/storage/watch/batcher.go create mode 100644 pkg/util/sync/batcher.go rename pkg/{storage/watch => util/sync}/batcher_test.go (92%) diff --git a/pkg/storage/watch/batcher.go b/pkg/storage/watch/batcher.go deleted file mode 100644 index 9612dc996..000000000 --- a/pkg/storage/watch/batcher.go +++ /dev/null @@ -1,69 +0,0 @@ -package watch - -import ( - "sync" - "time" - - log "github.com/sirupsen/logrus" -) - -func NewBatcher(duration time.Duration) *Batcher { - return &Batcher{ - duration: duration, - flushCh: make(chan struct{}), - syncMap: &sync.Map{}, - } -} - -type Batcher struct { - duration time.Duration - timer *time.Timer - flushCh chan struct{} - syncMap *sync.Map -} - -func (b *Batcher) Load(key interface{}) (value interface{}, ok bool) { - return b.syncMap.Load(key) -} - -func (b *Batcher) Store(key, value interface{}) { - // prevent the timer from firing as we're manipulating it now - b.cancelUnfiredTimer() - // store the key and the value as requested - log.Tracef("Batcher: Storing key %v and value %q, reset the timer.", key, value) - b.syncMap.Store(key, value) - // set the timer to fire after the duration, unless there's a new .Store call - b.dispatchAfterTimeout() -} - -func (b *Batcher) Close() { - log.Trace("Batcher: Closing the batch channel") - close(b.flushCh) -} - -func (b *Batcher) ProcessBatch(fn func(key, val interface{}) bool) bool { - if _, ok := <-b.flushCh; !ok { - // channel is closed - return false - } - log.Trace("Batcher: Received a flush for the batch. Dispatching it now.") - b.syncMap.Range(fn) - *b.syncMap = sync.Map{} - return true -} - -func (b *Batcher) cancelUnfiredTimer() { - // If the timer already exists; stop it - if b.timer != nil { - log.Tracef("Batcher: Cancelled timer") - b.timer.Stop() - b.timer = nil - } -} - -func (b *Batcher) dispatchAfterTimeout() { - b.timer = time.AfterFunc(b.duration, func() { - log.Tracef("Batcher: Dispatching a batch job") - b.flushCh <- struct{}{} - }) -} diff --git a/pkg/storage/watch/watch.go b/pkg/storage/watch/watch.go index 0f43e1455..5e316ed99 100644 --- a/pkg/storage/watch/watch.go +++ b/pkg/storage/watch/watch.go @@ -67,7 +67,7 @@ type watcher struct { // the batcher is used for properly sending many concurrent inotify events // as a group, after a specified timeout. This fixes the issue of one single // file operation being registered as many different inotify events - batcher *Batcher + batcher *sync.WriteBatcher } func (w *watcher) addWatch(path string) (err error) { @@ -105,7 +105,7 @@ func newWatcher(dir string) (w *watcher, files []string, err error) { dir: dir, events: make(eventStream, eventBuffer), updates: make(UpdateStream, eventBuffer), - batcher: NewBatcher(dispatchDuration), + batcher: sync.NewWriteBatcher(dispatchDuration), } if err = w.start(&files); err != nil { diff --git a/pkg/util/sync/batcher.go b/pkg/util/sync/batcher.go new file mode 100644 index 000000000..a6f52aaa8 --- /dev/null +++ b/pkg/util/sync/batcher.go @@ -0,0 +1,85 @@ +package sync + +import ( + "sync" + "time" + + log "github.com/sirupsen/logrus" +) + +// NewWriteBatcher creates a new WriteBatcher +func NewWriteBatcher(duration time.Duration) *WriteBatcher { + return &WriteBatcher{ + duration: duration, + flushCh: make(chan struct{}), + syncMap: &sync.Map{}, + } +} + +// WriteBatcher is a struct that wraps a concurrent sync.Map +// and dispatches all writes to it at once, a specific +// duration (e.g. 1s) after the last write was performed. This +// allows for 100s of concurrent writes in milliseconds to the +// same map in one sending goroutine; and one receiving goroutine +// which can process the result after all the writes are done. +type WriteBatcher struct { + duration time.Duration + timer *time.Timer + flushCh chan struct{} + syncMap *sync.Map +} + +// Load reads the key from the map +func (b *WriteBatcher) Load(key interface{}) (value interface{}, ok bool) { + return b.syncMap.Load(key) +} + +// Store writes the value for the specified key to the map +// If no other .Store call is made during the specified duration, +// flushCh is invoked and ProcessBatch unblocks in the other goroutine +func (b *WriteBatcher) Store(key, value interface{}) { + // prevent the timer from firing as we're manipulating it now + b.cancelUnfiredTimer() + // store the key and the value as requested + log.Tracef("WriteBatcher: Storing key %v and value %q, reset the timer.", key, value) + b.syncMap.Store(key, value) + // set the timer to fire after the duration, unless there's a new .Store call + b.dispatchAfterTimeout() +} + +// Close closes the underlying channel +func (b *WriteBatcher) Close() { + log.Trace("WriteBatcher: Closing the batch channel") + close(b.flushCh) +} + +// ProcessBatch is effectively a Range over the sync.Map, once a batch write is +// released. This should be used in the receiving goroutine. The internal map is +// reset after this call, so be sure to capture all the contents if needed. This +// function returns false if Close() has been called. +func (b *WriteBatcher) ProcessBatch(fn func(key, val interface{}) bool) bool { + if _, ok := <-b.flushCh; !ok { + // channel is closed + return false + } + log.Trace("WriteBatcher: Received a flush for the batch. Dispatching it now.") + b.syncMap.Range(fn) + *b.syncMap = sync.Map{} + return true +} + +func (b *WriteBatcher) cancelUnfiredTimer() { + // If the timer already exists; stop it + if b.timer != nil { + log.Tracef("WriteBatcher: Cancelled timer") + b.timer.Stop() + b.timer = nil + } +} + +func (b *WriteBatcher) dispatchAfterTimeout() { + b.timer = time.AfterFunc(b.duration, func() { + log.Tracef("WriteBatcher: Dispatching a batch job") + b.flushCh <- struct{}{} + }) +} diff --git a/pkg/storage/watch/batcher_test.go b/pkg/util/sync/batcher_test.go similarity index 92% rename from pkg/storage/watch/batcher_test.go rename to pkg/util/sync/batcher_test.go index 81f149b4a..0d851aae9 100644 --- a/pkg/storage/watch/batcher_test.go +++ b/pkg/util/sync/batcher_test.go @@ -1,4 +1,4 @@ -package watch +package sync import ( "fmt" @@ -14,9 +14,9 @@ type job struct { event string } -func TestBatcher(t *testing.T) { +func TestWriteBatcher(t *testing.T) { ch := make(chan job) - b := NewBatcher(1 * time.Second) + b := NewWriteBatcher(1 * time.Second) go func() { for i := 0; i < 10; i++ { fmt.Println(i) From 2d1c6177ff7cf46ddaf84cb6060893c79f843f9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20K=C3=A4ldstr=C3=B6m?= Date: Tue, 30 Jul 2019 17:27:05 +0300 Subject: [PATCH 3/7] Move the watcher to pkg/util/watcher --- pkg/gitops/gitops.go | 10 +- pkg/storage/sync/storage.go | 5 +- pkg/storage/watch/storage.go | 25 +- pkg/storage/watch/update/associated.go | 13 - pkg/storage/watch/update/file.go | 8 - pkg/storage/watch/update/update.go | 12 +- pkg/storage/watch/watcher_test.go | 48 ---- .../watch/update => util/watcher}/event.go | 9 +- .../update => util/watcher}/event_test.go | 2 +- .../watch.go => util/watcher/filewatcher.go} | 236 ++++++++++-------- pkg/util/watcher/filewatcher_test.go | 47 ++++ 11 files changed, 218 insertions(+), 197 deletions(-) delete mode 100644 pkg/storage/watch/update/associated.go delete mode 100644 pkg/storage/watch/update/file.go delete mode 100644 pkg/storage/watch/watcher_test.go rename pkg/{storage/watch/update => util/watcher}/event.go (85%) rename pkg/{storage/watch/update => util/watcher}/event_test.go (94%) rename pkg/{storage/watch/watch.go => util/watcher/filewatcher.go} (53%) create mode 100644 pkg/util/watcher/filewatcher_test.go diff --git a/pkg/gitops/gitops.go b/pkg/gitops/gitops.go index d11222f59..ef3487c1e 100644 --- a/pkg/gitops/gitops.go +++ b/pkg/gitops/gitops.go @@ -13,7 +13,7 @@ import ( "github.com/weaveworks/ignite/pkg/operations" "github.com/weaveworks/ignite/pkg/storage/cache" "github.com/weaveworks/ignite/pkg/storage/manifest" - "github.com/weaveworks/ignite/pkg/storage/watch/update" + "github.com/weaveworks/ignite/pkg/util/watcher" "github.com/weaveworks/ignite/pkg/util" ) @@ -58,7 +58,7 @@ func RunLoop(url, branch string, paths []string) error { } var vm *api.VM - if upd.Event == update.EventDelete { + if upd.Event == watcher.EventDelete { // As we know this VM was deleted, it wouldn't show up in a Get() call // Construct a temporary VM object for passing to the delete function vm = &api.VM{ @@ -86,15 +86,15 @@ func RunLoop(url, branch string, paths []string) error { // TODO: Paralellization switch upd.Event { - case update.EventCreate: + case watcher.EventCreate: runHandle(func() error { return handleCreate(vm) }) - case update.EventModify: + case watcher.EventModify: runHandle(func() error { return handleChange(vm) }) - case update.EventDelete: + case watcher.EventDelete: runHandle(func() error { // TODO: Temporary VM Object for removal return handleDelete(vm) diff --git a/pkg/storage/sync/storage.go b/pkg/storage/sync/storage.go index 63dd267f7..ef9d2e245 100644 --- a/pkg/storage/sync/storage.go +++ b/pkg/storage/sync/storage.go @@ -10,6 +10,7 @@ import ( "github.com/weaveworks/ignite/pkg/storage/watch" "github.com/weaveworks/ignite/pkg/storage/watch/update" "github.com/weaveworks/ignite/pkg/util/sync" + "github.com/weaveworks/ignite/pkg/util/watcher" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -142,7 +143,7 @@ func (ss *SyncStorage) monitorFunc() { if ok { switch upd.Event { - case update.EventModify, update.EventCreate: + case watcher.EventModify, watcher.EventCreate: // First load the Object using the Storage given in the update, // then set it using the client constructed above updClient := client.NewClient(upd.Storage).Dynamic(upd.APIType.GetKind()) @@ -156,7 +157,7 @@ func (ss *SyncStorage) monitorFunc() { log.Errorf("Failed to set Object with UID %q: %v", upd.APIType.GetUID(), err) continue } - case update.EventDelete: + case watcher.EventDelete: // For deletion we use the generated "fake" APIType object if err := c.Dynamic(upd.APIType.GetKind()).Delete(upd.APIType.GetUID()); err != nil { log.Errorf("Failed to delete Object with UID %q: %v", upd.APIType.GetUID(), err) diff --git a/pkg/storage/watch/storage.go b/pkg/storage/watch/storage.go index e3da85043..803980133 100644 --- a/pkg/storage/watch/storage.go +++ b/pkg/storage/watch/storage.go @@ -12,6 +12,7 @@ import ( "github.com/weaveworks/ignite/pkg/storage/manifest/raw" "github.com/weaveworks/ignite/pkg/storage/watch/update" "github.com/weaveworks/ignite/pkg/util/sync" + "github.com/weaveworks/ignite/pkg/util/watcher" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/yaml" @@ -40,7 +41,7 @@ func NewGenericWatchStorage(storage storage.Storage) (WatchStorage, error) { var err error var files []string - if s.watcher, files, err = newWatcher(storage.RawStorage().Dir()); err != nil { + if s.watcher, files, err = watcher.NewFileWatcher(storage.RawStorage().Dir()); err != nil { return nil, err } @@ -56,7 +57,7 @@ func NewGenericWatchStorage(storage storage.Storage) (WatchStorage, error) { // GenericWatchStorage implements the WatchStorage interface type GenericWatchStorage struct { storage.Storage - watcher *watcher + watcher *watcher.FileWatcher events *AssociatedEventStream monitor *sync.Monitor } @@ -65,19 +66,19 @@ var _ WatchStorage = &GenericWatchStorage{} // Suspend modify events during Set func (s *GenericWatchStorage) Set(gvk schema.GroupVersionKind, obj meta.Object) error { - s.watcher.suspend(update.EventModify) + s.watcher.Suspend(watcher.EventModify) return s.Storage.Set(gvk, obj) } // Suspend modify events during Patch func (s *GenericWatchStorage) Patch(gvk schema.GroupVersionKind, uid meta.UID, patch []byte) error { - s.watcher.suspend(update.EventModify) + s.watcher.Suspend(watcher.EventModify) return s.Storage.Patch(gvk, uid, patch) } // Suspend delete events during Delete func (s *GenericWatchStorage) Delete(gvk schema.GroupVersionKind, uid meta.UID) error { - s.watcher.suspend(update.EventDelete) + s.watcher.Suspend(watcher.EventDelete) return s.Storage.Delete(gvk, uid) } @@ -86,7 +87,7 @@ func (s *GenericWatchStorage) SetEventStream(eventStream AssociatedEventStream) } func (s *GenericWatchStorage) Close() { - s.watcher.close() + s.watcher.Close() s.monitor.Wait() } @@ -100,16 +101,16 @@ func (s *GenericWatchStorage) monitorFunc(mapped raw.MappedRawStorage, files []s } else { mapped.AddMapping(storage.NewKey(obj.GetKind(), obj.GetUID()), file) // Send the event to the events channel - s.sendEvent(update.EventModify, obj) + s.sendEvent(watcher.EventModify, obj) } } for { - if event, ok := <-s.watcher.updates; ok { + if event, ok := <-s.watcher.GetFileUpdateStream(); ok { var obj meta.Object var err error - if event.Event == update.EventDelete { + if event.Event == watcher.EventDelete { var key storage.Key if key, err = mapped.GetMapping(event.Path); err != nil { log.Warnf("Failed to retrieve data for %q: %v", event.Path, err) @@ -131,8 +132,8 @@ func (s *GenericWatchStorage) monitorFunc(mapped raw.MappedRawStorage, files []s continue } - // This is based on the key's existence instead of update.EventCreate, - // as Objects can get updated (via update.EventModify) to be conformant + // This is based on the key's existence instead of watcher.EventCreate, + // as Objects can get updated (via watcher.EventModify) to be conformant if _, err = mapped.GetMapping(event.Path); err != nil { mapped.AddMapping(storage.NewKey(obj.GetKind(), obj.GetUID()), event.Path) } @@ -146,7 +147,7 @@ func (s *GenericWatchStorage) monitorFunc(mapped raw.MappedRawStorage, files []s } } -func (s *GenericWatchStorage) sendEvent(event update.Event, obj meta.Object) { +func (s *GenericWatchStorage) sendEvent(event watcher.Event, obj meta.Object) { if s.events != nil { *s.events <- update.AssociatedUpdate{ Update: update.Update{ diff --git a/pkg/storage/watch/update/associated.go b/pkg/storage/watch/update/associated.go deleted file mode 100644 index 239db858d..000000000 --- a/pkg/storage/watch/update/associated.go +++ /dev/null @@ -1,13 +0,0 @@ -package update - -import ( - "github.com/weaveworks/ignite/pkg/storage" -) - -// AssociatedUpdate bundles together an Update and a Storage -// implementation. This is used by SyncStorage to query the -// correct Storage for the updated Object. -type AssociatedUpdate struct { - Update - Storage storage.Storage -} diff --git a/pkg/storage/watch/update/file.go b/pkg/storage/watch/update/file.go deleted file mode 100644 index 706bb975f..000000000 --- a/pkg/storage/watch/update/file.go +++ /dev/null @@ -1,8 +0,0 @@ -package update - -// FileUpdate is used by watchers to -// signal the state change of a file. -type FileUpdate struct { - Event Event - Path string -} diff --git a/pkg/storage/watch/update/update.go b/pkg/storage/watch/update/update.go index 41e054c79..804069189 100644 --- a/pkg/storage/watch/update/update.go +++ b/pkg/storage/watch/update/update.go @@ -2,11 +2,21 @@ package update import ( meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1" + "github.com/weaveworks/ignite/pkg/storage" + "github.com/weaveworks/ignite/pkg/util/watcher" ) // Update bundles an Event with an // APIType for Storage retrieval. type Update struct { - Event Event + Event watcher.Event APIType meta.Object } + +// AssociatedUpdate bundles together an Update and a Storage +// implementation. This is used by SyncStorage to query the +// correct Storage for the updated Object. +type AssociatedUpdate struct { + Update + Storage storage.Storage +} \ No newline at end of file diff --git a/pkg/storage/watch/watcher_test.go b/pkg/storage/watch/watcher_test.go deleted file mode 100644 index 59f2546ca..000000000 --- a/pkg/storage/watch/watcher_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package watch - -import ( - "github.com/weaveworks/ignite/pkg/storage/watch/update" - "reflect" - "testing" -) - -var testEvents = []update.Events{ - { - update.EventDelete, - update.EventCreate, - update.EventModify, - }, - { - update.EventCreate, - update.EventModify, - update.EventDelete, - }, - { - update.EventCreate, - update.EventModify, - update.EventDelete, - update.EventCreate, - }, -} - -var targets = []update.Events{ - { - update.EventModify, - }, - { - update.EventNone, - }, - { - update.EventNone, - update.EventCreate, - }, -} - -func TestEventConcatenation(t *testing.T) { - for i, e := range testEvents { - result := concatenateEvents(e) - if !reflect.DeepEqual(result, targets[i]) { - t.Errorf("wrong concatenation result: %v != %v", result, targets[i]) - } - } -} diff --git a/pkg/storage/watch/update/event.go b/pkg/util/watcher/event.go similarity index 85% rename from pkg/storage/watch/update/event.go rename to pkg/util/watcher/event.go index 7ac35161b..b8b754e21 100644 --- a/pkg/storage/watch/update/event.go +++ b/pkg/util/watcher/event.go @@ -1,4 +1,4 @@ -package update +package watcher import ( "fmt" @@ -51,3 +51,10 @@ func (e Events) Bytes() []byte { return b } + +// FileUpdate is used by watchers to +// signal the state change of a file. +type FileUpdate struct { + Event Event + Path string +} \ No newline at end of file diff --git a/pkg/storage/watch/update/event_test.go b/pkg/util/watcher/event_test.go similarity index 94% rename from pkg/storage/watch/update/event_test.go rename to pkg/util/watcher/event_test.go index f323b9c6f..6d6c31cac 100644 --- a/pkg/storage/watch/update/event_test.go +++ b/pkg/util/watcher/event_test.go @@ -1,4 +1,4 @@ -package update +package watcher import ( "reflect" diff --git a/pkg/storage/watch/watch.go b/pkg/util/watcher/filewatcher.go similarity index 53% rename from pkg/storage/watch/watch.go rename to pkg/util/watcher/filewatcher.go index 5e316ed99..6757b1dac 100644 --- a/pkg/storage/watch/watch.go +++ b/pkg/util/watcher/filewatcher.go @@ -1,4 +1,4 @@ -package watch +package watcher import ( "bytes" @@ -8,70 +8,115 @@ import ( "github.com/rjeczalik/notify" log "github.com/sirupsen/logrus" - "github.com/weaveworks/ignite/pkg/storage" - "github.com/weaveworks/ignite/pkg/storage/watch/update" "github.com/weaveworks/ignite/pkg/util/sync" ) const eventBuffer = 4096 // How many events and updates we can buffer before watching is interrupted -const dispatchDuration = 1 * time.Second // Duration to wait after last event before dispatching grouped inotify events -var excludeDirs = []string{".git"} var listenEvents = []notify.Event{notify.InCreate, notify.InDelete, notify.InDeleteSelf, notify.InCloseWrite} -var eventMap = map[notify.Event]update.Event{ - notify.InCreate: update.EventCreate, - notify.InDelete: update.EventDelete, - notify.InCloseWrite: update.EventModify, +var eventMap = map[notify.Event]Event{ + notify.InCreate: EventCreate, + notify.InDelete: EventDelete, + notify.InCloseWrite: EventModify, } // combinedEvent describes multiple events that should be concatenated into a single event type combinedEvent struct { input []byte // input is a slice of events to match (in bytes, it speeds up the comparison) - output update.Event // output is the resulting event that should be returned + output Event // output is the resulting event that should be returned } // combinedEvents describes the event combinations to concatenate, // this is iterated in order, so the longest matches should be first var combinedEvents = []combinedEvent{ // DELETE + CREATE + MODIFY => MODIFY - {update.Events{update.EventDelete, update.EventCreate, update.EventModify}.Bytes(), update.EventModify}, + {Events{EventDelete, EventCreate, EventModify}.Bytes(), EventModify}, // CREATE + MODIFY => CREATE - {update.Events{update.EventCreate, update.EventModify}.Bytes(), update.EventCreate}, + {Events{EventCreate, EventModify}.Bytes(), EventCreate}, // CREATE + DELETE => NONE - {update.Events{update.EventCreate, update.EventDelete}.Bytes(), update.EventNone}, + {Events{EventCreate, EventDelete}.Bytes(), EventNone}, } // Suppress duplicate events registered in this map. E.g. directory deletion // fires two DELETE events, one for the parent and one for the deleted directory itself -var suppressDuplicates = map[update.Event]bool{ - update.EventCreate: true, - update.EventDelete: true, +var suppressDuplicates = map[Event]bool{ + EventCreate: true, + EventDelete: true, } type eventStream chan notify.EventInfo -type UpdateStream chan *update.FileUpdate +type FileUpdateStream chan *FileUpdate type watches []string -// watcher recursively monitors changes in files in the given directory +// Options specifies options for the FileWatcher +type Options struct { + // ExcludeDirs specifies what directories to not watch + ExcludeDirs []string + // BatchTimeout specifies the duration to wait after last event before dispatching grouped inotify events + BatchTimeout time.Duration + // ValidExtensions specifies what file extensions to look at + ValidExtensions []string +} + +// DefaultOptions returns the default options +func DefaultOptions() Options { + return Options{ + ExcludeDirs: []string{".git"}, + BatchTimeout: 1 * time.Second, + ValidExtensions: []string{".yaml", ".yml", ".json"}, + } +} + +// NewFileWatcher returns a list of files in the watched directory in +// addition to the generated FileWatcher, it can be used to populate +// MappedRawStorage fileMappings +func NewFileWatcher(dir string) (w *FileWatcher, files []string, err error) { + return NewFileWatcherWithOptions(dir, DefaultOptions()) +} + +// NewFileWatcher returns a list of files in the watched directory in +// addition to the generated FileWatcher, it can be used to populate +// MappedRawStorage fileMappings +func NewFileWatcherWithOptions(dir string, opts Options) (w *FileWatcher, files []string, err error) { + w = &FileWatcher{ + dir: dir, + events: make(eventStream, eventBuffer), + updates: make(FileUpdateStream, eventBuffer), + batcher: sync.NewWriteBatcher(opts.BatchTimeout), + opts: opts, + } + + if err = w.start(&files); err != nil { + notify.Stop(w.events) + } else { + w.monitor = sync.RunMonitor(w.monitorFunc) + w.dispatcher = sync.RunMonitor(w.dispatchFunc) + } + + return +} + +// FileWatcher recursively monitors changes in files in the given directory // and sends out events based on their state changes. Only files conforming -// to validSuffix are monitored. The watcher can be suspended for a single +// to validSuffix are monitored. The FileWatcher can be suspended for a single // event at a time to eliminate updates by WatchStorage causing a loop. -type watcher struct { +type FileWatcher struct { dir string events eventStream - updates UpdateStream + updates FileUpdateStream watches watches - suspendEvent update.Event + suspendEvent Event monitor *sync.Monitor dispatcher *sync.Monitor + opts Options // the batcher is used for properly sending many concurrent inotify events // as a group, after a specified timeout. This fixes the issue of one single // file operation being registered as many different inotify events batcher *sync.WriteBatcher } -func (w *watcher) addWatch(path string) (err error) { - log.Tracef("Watcher: Adding watch for %q", path) +func (w *FileWatcher) addWatch(path string) (err error) { + log.Tracef("FileWatcher: Adding watch for %q", path) if err = notify.Watch(path, w.events, listenEvents...); err == nil { w.watches = append(w.watches, path) } @@ -79,48 +124,27 @@ func (w *watcher) addWatch(path string) (err error) { return } -func (w *watcher) hasWatch(path string) bool { +func (w *FileWatcher) hasWatch(path string) bool { for _, watch := range w.watches { if watch == path { - log.Tracef("Watcher: Watch found for %q", path) + log.Tracef("FileWatcher: Watch found for %q", path) return true } } - log.Tracef("Watcher: No watch found for %q", path) + log.Tracef("FileWatcher: No watch found for %q", path) return false } -func (w *watcher) clear() { - log.Tracef("Watcher: Clearing all watches") +func (w *FileWatcher) clear() { + log.Tracef("FileWatcher: Clearing all watches") notify.Stop(w.events) w.watches = w.watches[:0] } -// newWatcher returns a list of files in the watched directory in -// addition to the generated watcher, it can be used to populate -// MappedRawStorage fileMappings -func newWatcher(dir string) (w *watcher, files []string, err error) { - w = &watcher{ - dir: dir, - events: make(eventStream, eventBuffer), - updates: make(UpdateStream, eventBuffer), - batcher: sync.NewWriteBatcher(dispatchDuration), - } - - if err = w.start(&files); err != nil { - notify.Stop(w.events) - } else { - w.monitor = sync.RunMonitor(w.monitorFunc) - w.dispatcher = sync.RunMonitor(w.dispatchFunc) - } - - return -} - // start discovers all subdirectories and adds paths to // notify before starting the monitoring goroutine -func (w *watcher) start(files *[]string) error { +func (w *FileWatcher) start(files *[]string) error { return filepath.Walk(w.dir, func(path string, info os.FileInfo, err error) error { if err != nil { @@ -128,7 +152,7 @@ func (w *watcher) start(files *[]string) error { } if info.IsDir() { - for _, dir := range excludeDirs { + for _, dir := range w.opts.ExcludeDirs { if info.Name() == dir { return filepath.SkipDir // Skip excluded directories } @@ -139,7 +163,7 @@ func (w *watcher) start(files *[]string) error { if files != nil { // Only include files with a valid suffix - if validSuffix(info.Name()) { + if w.validSuffix(info.Name()) { *files = append(*files, path) } } @@ -148,10 +172,10 @@ func (w *watcher) start(files *[]string) error { }) } -func (w *watcher) monitorFunc() { - log.Debug("Watcher: Monitoring thread started") - defer log.Debug("Watcher: Monitoring thread stopped") - defer close(w.updates) // Close the update stream after the watcher has stopped +func (w *FileWatcher) monitorFunc() { + log.Debug("FileWatcher: Monitoring thread started") + defer log.Debug("FileWatcher: Monitoring thread stopped") + defer close(w.updates) // Close the update stream after the FileWatcher has stopped for { event, ok := <-w.events @@ -162,40 +186,40 @@ func (w *watcher) monitorFunc() { updateEvent := convertEvent(event.Event()) if updateEvent == w.suspendEvent { w.suspendEvent = 0 - log.Debugf("Watcher: Skipping suspended event %s for path: %q", updateEvent, event.Path()) + log.Debugf("FileWatcher: Skipping suspended event %s for path: %q", updateEvent, event.Path()) continue // Skip the suspended event } // Suppress successive duplicate events registered in suppressDuplicates if suppressEvent(event.Path(), updateEvent) { - log.Debugf("Watcher: Skipping suppressed event %s for path: %q", updateEvent, event.Path()) + log.Debugf("FileWatcher: Skipping suppressed event %s for path: %q", updateEvent, event.Path()) continue // Skip the suppressed event } - // Directory bypass for watcher registration - // The watcher registration/deletion needs to be as fast as + // Directory bypass for FileWatcher registration + // The FileWatcher registration/deletion needs to be as fast as // possible, bypass the batcher when dealing with directories if w.handleDirEvent(event.Path(), updateEvent) { continue // The event path matched a directory, skip file processing for the event } // Get any events registered for the specific file, and append the specified event - var eventList update.Events + var eventList Events if val, ok := w.batcher.Load(event.Path()); ok { - eventList = val.(update.Events) + eventList = val.(Events) } eventList = append(eventList, updateEvent) // Register the event in the map, and dispatch all the events at once after the timeout w.batcher.Store(event.Path(), eventList) - log.Debugf("Watcher: Registered inotify events %v for path %q", eventList, event.Path()) + log.Debugf("FileWatcher: Registered inotify events %v for path %q", eventList, event.Path()) } } -func (w *watcher) dispatchFunc() { - log.Debug("Watcher: Dispatch thread started") - defer log.Debug("Watcher: Dispatch thread stopped") +func (w *FileWatcher) dispatchFunc() { + log.Debug("FileWatcher: Dispatch thread started") + defer log.Debug("FileWatcher: Dispatch thread stopped") for { // Wait until we have a batch dispatched to us @@ -203,7 +227,7 @@ func (w *watcher) dispatchFunc() { filePath := key.(string) // Concatenate all known events, and dispatch them to be handled one by one - for _, event := range concatenateEvents(val.(update.Events)) { + for _, event := range concatenateEvents(val.(Events)) { w.handleEvent(filePath, event) } @@ -211,47 +235,47 @@ func (w *watcher) dispatchFunc() { return true }) - log.Debug("Watcher: Dispatched events batch and reset the events cache") + log.Debug("FileWatcher: Dispatched events batch and reset the events cache") } } -func (w *watcher) handleEvent(filePath string, event update.Event) { +func (w *FileWatcher) handleEvent(filePath string, event Event) { switch event { - case update.EventCreate, update.EventDelete, update.EventModify: // Ignore EventNone + case EventCreate, EventDelete, EventModify: // Ignore EventNone // only care about valid files - if !validSuffix(filePath) { + if !w.validSuffix(filePath) { return } - log.Debugf("Watcher: Sending update: %s -> %q", event, filePath) - w.updates <- &update.FileUpdate{ + log.Debugf("FileWatcher: Sending update: %s -> %q", event, filePath) + w.updates <- &FileUpdate{ Event: event, Path: filePath, } } } -func (w *watcher) handleDirEvent(filePath string, event update.Event) (dir bool) { +func (w *FileWatcher) handleDirEvent(filePath string, event Event) (dir bool) { switch event { - case update.EventCreate: + case EventCreate: fi, err := os.Stat(filePath) if err != nil { - log.Errorf("Watcher: Failed to stat %q: %v", filePath, err) + log.Errorf("FileWatcher: Failed to stat %q: %v", filePath, err) return } if fi.IsDir() { if err := w.addWatch(filePath); err != nil { - log.Errorf("Watcher: Failed to add %q: %v", filePath, err) + log.Errorf("FileWatcher: Failed to add %q: %v", filePath, err) } dir = true } - case update.EventDelete: + case EventDelete: if w.hasWatch(filePath) { w.clear() if err := w.start(nil); err != nil { - log.Errorf("Watcher: Failed to re-initialize watches for %q", w.dir) + log.Errorf("FileWatcher: Failed to re-initialize watches for %q", w.dir) } dir = true @@ -261,14 +285,13 @@ func (w *watcher) handleDirEvent(filePath string, event update.Event) (dir bool) return } -// TODO: This watcher doesn't handle multiple operations on the same file well -// DELETE+CREATE+MODIFY => MODIFY -// CREATE+MODIFY => CREATE -// Fix this by caching the operations on the same file, and one second after all operations -// have been "written"; go through the changes and interpret the combinations of events properly -// This maybe will allow us to remove the "suspend" functionality? I don't know yet +// GetFileUpdateStream gets the channel with FileUpdates +func (w *FileWatcher) GetFileUpdateStream() FileUpdateStream { + return w.updates +} -func (w *watcher) close() { +// Close closes active underlying resources +func (w *FileWatcher) Close() { notify.Stop(w.events) w.batcher.Close() close(w.events) // Close the event stream @@ -276,24 +299,16 @@ func (w *watcher) close() { w.dispatcher.Wait() } -// This enables a one-time suspend of the given event, -// the watcher will skip the given event once -func (w *watcher) suspend(updateEvent update.Event) { +// Suspend enables a one-time suspend of the given event, +// the FileWatcher will skip the given event once +func (w *FileWatcher) Suspend(updateEvent Event) { w.suspendEvent = updateEvent } -func convertEvent(event notify.Event) update.Event { - if updateEvent, ok := eventMap[event]; ok { - return updateEvent - } - - return update.EventNone -} - // validSuffix is used to filter out all unsupported -// files based on the extensions in storage.Formats -func validSuffix(path string) bool { - for suffix := range storage.Formats { +// files based on the extensions given +func (w *FileWatcher) validSuffix(path string) bool { + for _, suffix := range w.opts.ValidExtensions { if filepath.Ext(path) == suffix { return true } @@ -302,9 +317,18 @@ func validSuffix(path string) bool { return false } +func convertEvent(event notify.Event) Event { + if updateEvent, ok := eventMap[event]; ok { + return updateEvent + } + + return EventNone +} + + // concatenateEvents takes in a slice of events and concatenates // all events possible based on combinedEvents -func concatenateEvents(events update.Events) update.Events { +func concatenateEvents(events Events) Events { if len(events) < 2 { return events // Quick return for 0 or 1 event } @@ -317,8 +341,8 @@ func concatenateEvents(events update.Events) update.Events { // Test if the prefix of the given events matches combinedEvent.input if bytes.Equal(events.Bytes()[:len(combinedEvent.input)], combinedEvent.input) { // If so, replace combinedEvent.input prefix in events with combinedEvent.output and recurse - concatenated := append(update.Events{combinedEvent.output}, events[len(combinedEvent.input):]...) - log.Tracef("Watcher: Concatenated events: %v -> %v", events, concatenated) + concatenated := append(Events{combinedEvent.output}, events[len(combinedEvent.input):]...) + log.Tracef("FileWatcher: Concatenated events: %v -> %v", events, concatenated) return concatenateEvents(concatenated) } } @@ -327,13 +351,13 @@ func concatenateEvents(events update.Events) update.Events { } var suppressCache struct { - event update.Event + event Event path string } // suppressEvent returns true it it's called twice // in a row with the same known event and path -func suppressEvent(path string, event update.Event) (s bool) { +func suppressEvent(path string, event Event) (s bool) { if _, ok := suppressDuplicates[event]; ok { if suppressCache.event == event && suppressCache.path == path { s = true diff --git a/pkg/util/watcher/filewatcher_test.go b/pkg/util/watcher/filewatcher_test.go new file mode 100644 index 000000000..bd914f863 --- /dev/null +++ b/pkg/util/watcher/filewatcher_test.go @@ -0,0 +1,47 @@ +package watcher + +import ( + "reflect" + "testing" +) + +var testEvents = []Events{ + { + EventDelete, + EventCreate, + EventModify, + }, + { + EventCreate, + EventModify, + EventDelete, + }, + { + EventCreate, + EventModify, + EventDelete, + EventCreate, + }, +} + +var targets = []Events{ + { + EventModify, + }, + { + EventNone, + }, + { + EventNone, + EventCreate, + }, +} + +func TestEventConcatenation(t *testing.T) { + for i, e := range testEvents { + result := concatenateEvents(e) + if !reflect.DeepEqual(result, targets[i]) { + t.Errorf("wrong concatenation result: %v != %v", result, targets[i]) + } + } +} From b1cc16f131503dbcaa9d0d12ffc9f5b97fc736bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20K=C3=A4ldstr=C3=B6m?= Date: Tue, 30 Jul 2019 17:27:23 +0300 Subject: [PATCH 4/7] make tidy --- pkg/gitops/gitops.go | 2 +- pkg/storage/watch/update/update.go | 2 +- pkg/util/watcher/event.go | 2 +- pkg/util/watcher/filewatcher.go | 15 +++++++-------- 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/pkg/gitops/gitops.go b/pkg/gitops/gitops.go index ef3487c1e..8ff969b32 100644 --- a/pkg/gitops/gitops.go +++ b/pkg/gitops/gitops.go @@ -13,8 +13,8 @@ import ( "github.com/weaveworks/ignite/pkg/operations" "github.com/weaveworks/ignite/pkg/storage/cache" "github.com/weaveworks/ignite/pkg/storage/manifest" - "github.com/weaveworks/ignite/pkg/util/watcher" "github.com/weaveworks/ignite/pkg/util" + "github.com/weaveworks/ignite/pkg/util/watcher" ) var ( diff --git a/pkg/storage/watch/update/update.go b/pkg/storage/watch/update/update.go index 804069189..d65c47ee7 100644 --- a/pkg/storage/watch/update/update.go +++ b/pkg/storage/watch/update/update.go @@ -19,4 +19,4 @@ type Update struct { type AssociatedUpdate struct { Update Storage storage.Storage -} \ No newline at end of file +} diff --git a/pkg/util/watcher/event.go b/pkg/util/watcher/event.go index b8b754e21..6090aa2b8 100644 --- a/pkg/util/watcher/event.go +++ b/pkg/util/watcher/event.go @@ -57,4 +57,4 @@ func (e Events) Bytes() []byte { type FileUpdate struct { Event Event Path string -} \ No newline at end of file +} diff --git a/pkg/util/watcher/filewatcher.go b/pkg/util/watcher/filewatcher.go index 6757b1dac..863e49c20 100644 --- a/pkg/util/watcher/filewatcher.go +++ b/pkg/util/watcher/filewatcher.go @@ -11,7 +11,7 @@ import ( "github.com/weaveworks/ignite/pkg/util/sync" ) -const eventBuffer = 4096 // How many events and updates we can buffer before watching is interrupted +const eventBuffer = 4096 // How many events and updates we can buffer before watching is interrupted var listenEvents = []notify.Event{notify.InCreate, notify.InDelete, notify.InDeleteSelf, notify.InCloseWrite} var eventMap = map[notify.Event]Event{ @@ -22,8 +22,8 @@ var eventMap = map[notify.Event]Event{ // combinedEvent describes multiple events that should be concatenated into a single event type combinedEvent struct { - input []byte // input is a slice of events to match (in bytes, it speeds up the comparison) - output Event // output is the resulting event that should be returned + input []byte // input is a slice of events to match (in bytes, it speeds up the comparison) + output Event // output is the resulting event that should be returned } // combinedEvents describes the event combinations to concatenate, @@ -61,8 +61,8 @@ type Options struct { // DefaultOptions returns the default options func DefaultOptions() Options { return Options{ - ExcludeDirs: []string{".git"}, - BatchTimeout: 1 * time.Second, + ExcludeDirs: []string{".git"}, + BatchTimeout: 1 * time.Second, ValidExtensions: []string{".yaml", ".yml", ".json"}, } } @@ -83,7 +83,7 @@ func NewFileWatcherWithOptions(dir string, opts Options) (w *FileWatcher, files events: make(eventStream, eventBuffer), updates: make(FileUpdateStream, eventBuffer), batcher: sync.NewWriteBatcher(opts.BatchTimeout), - opts: opts, + opts: opts, } if err = w.start(&files); err != nil { @@ -108,7 +108,7 @@ type FileWatcher struct { suspendEvent Event monitor *sync.Monitor dispatcher *sync.Monitor - opts Options + opts Options // the batcher is used for properly sending many concurrent inotify events // as a group, after a specified timeout. This fixes the issue of one single // file operation being registered as many different inotify events @@ -325,7 +325,6 @@ func convertEvent(event notify.Event) Event { return EventNone } - // concatenateEvents takes in a slice of events and concatenates // all events possible based on combinedEvents func concatenateEvents(events Events) Events { From a653ecc8434421e99cddf2fba9df1e374b33e7c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20K=C3=A4ldstr=C3=B6m?= Date: Wed, 31 Jul 2019 11:54:38 +0300 Subject: [PATCH 5/7] rename to BatchWriter, better-sounding --- pkg/util/sync/batcher.go | 32 ++++++++++++++++---------------- pkg/util/sync/batcher_test.go | 4 ++-- pkg/util/watcher/filewatcher.go | 4 ++-- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/pkg/util/sync/batcher.go b/pkg/util/sync/batcher.go index a6f52aaa8..d3855b254 100644 --- a/pkg/util/sync/batcher.go +++ b/pkg/util/sync/batcher.go @@ -7,22 +7,22 @@ import ( log "github.com/sirupsen/logrus" ) -// NewWriteBatcher creates a new WriteBatcher -func NewWriteBatcher(duration time.Duration) *WriteBatcher { - return &WriteBatcher{ +// NewBatchWriter creates a new BatchWriter +func NewBatchWriter(duration time.Duration) *BatchWriter { + return &BatchWriter{ duration: duration, flushCh: make(chan struct{}), syncMap: &sync.Map{}, } } -// WriteBatcher is a struct that wraps a concurrent sync.Map +// BatchWriter is a struct that wraps a concurrent sync.Map // and dispatches all writes to it at once, a specific // duration (e.g. 1s) after the last write was performed. This // allows for 100s of concurrent writes in milliseconds to the // same map in one sending goroutine; and one receiving goroutine // which can process the result after all the writes are done. -type WriteBatcher struct { +type BatchWriter struct { duration time.Duration timer *time.Timer flushCh chan struct{} @@ -30,26 +30,26 @@ type WriteBatcher struct { } // Load reads the key from the map -func (b *WriteBatcher) Load(key interface{}) (value interface{}, ok bool) { +func (b *BatchWriter) Load(key interface{}) (value interface{}, ok bool) { return b.syncMap.Load(key) } // Store writes the value for the specified key to the map // If no other .Store call is made during the specified duration, // flushCh is invoked and ProcessBatch unblocks in the other goroutine -func (b *WriteBatcher) Store(key, value interface{}) { +func (b *BatchWriter) Store(key, value interface{}) { // prevent the timer from firing as we're manipulating it now b.cancelUnfiredTimer() // store the key and the value as requested - log.Tracef("WriteBatcher: Storing key %v and value %q, reset the timer.", key, value) + log.Tracef("BatchWriter: Storing key %v and value %q, reset the timer.", key, value) b.syncMap.Store(key, value) // set the timer to fire after the duration, unless there's a new .Store call b.dispatchAfterTimeout() } // Close closes the underlying channel -func (b *WriteBatcher) Close() { - log.Trace("WriteBatcher: Closing the batch channel") +func (b *BatchWriter) Close() { + log.Trace("BatchWriter: Closing the batch channel") close(b.flushCh) } @@ -57,29 +57,29 @@ func (b *WriteBatcher) Close() { // released. This should be used in the receiving goroutine. The internal map is // reset after this call, so be sure to capture all the contents if needed. This // function returns false if Close() has been called. -func (b *WriteBatcher) ProcessBatch(fn func(key, val interface{}) bool) bool { +func (b *BatchWriter) ProcessBatch(fn func(key, val interface{}) bool) bool { if _, ok := <-b.flushCh; !ok { // channel is closed return false } - log.Trace("WriteBatcher: Received a flush for the batch. Dispatching it now.") + log.Trace("BatchWriter: Received a flush for the batch. Dispatching it now.") b.syncMap.Range(fn) *b.syncMap = sync.Map{} return true } -func (b *WriteBatcher) cancelUnfiredTimer() { +func (b *BatchWriter) cancelUnfiredTimer() { // If the timer already exists; stop it if b.timer != nil { - log.Tracef("WriteBatcher: Cancelled timer") + log.Tracef("BatchWriter: Cancelled timer") b.timer.Stop() b.timer = nil } } -func (b *WriteBatcher) dispatchAfterTimeout() { +func (b *BatchWriter) dispatchAfterTimeout() { b.timer = time.AfterFunc(b.duration, func() { - log.Tracef("WriteBatcher: Dispatching a batch job") + log.Tracef("BatchWriter: Dispatching a batch job") b.flushCh <- struct{}{} }) } diff --git a/pkg/util/sync/batcher_test.go b/pkg/util/sync/batcher_test.go index 0d851aae9..ff4e7f786 100644 --- a/pkg/util/sync/batcher_test.go +++ b/pkg/util/sync/batcher_test.go @@ -14,9 +14,9 @@ type job struct { event string } -func TestWriteBatcher(t *testing.T) { +func TestBatchWriter(t *testing.T) { ch := make(chan job) - b := NewWriteBatcher(1 * time.Second) + b := NewBatchWriter(1 * time.Second) go func() { for i := 0; i < 10; i++ { fmt.Println(i) diff --git a/pkg/util/watcher/filewatcher.go b/pkg/util/watcher/filewatcher.go index 863e49c20..c4e6d9f54 100644 --- a/pkg/util/watcher/filewatcher.go +++ b/pkg/util/watcher/filewatcher.go @@ -82,7 +82,7 @@ func NewFileWatcherWithOptions(dir string, opts Options) (w *FileWatcher, files dir: dir, events: make(eventStream, eventBuffer), updates: make(FileUpdateStream, eventBuffer), - batcher: sync.NewWriteBatcher(opts.BatchTimeout), + batcher: sync.NewBatchWriter(opts.BatchTimeout), opts: opts, } @@ -112,7 +112,7 @@ type FileWatcher struct { // the batcher is used for properly sending many concurrent inotify events // as a group, after a specified timeout. This fixes the issue of one single // file operation being registered as many different inotify events - batcher *sync.WriteBatcher + batcher *sync.BatchWriter } func (w *FileWatcher) addWatch(path string) (err error) { From e5835ee611ff3ec720c27f416835a1feb2645a73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20K=C3=A4ldstr=C3=B6m?= Date: Wed, 31 Jul 2019 11:57:41 +0300 Subject: [PATCH 6/7] fix merge conflict --- pkg/util/watcher/filewatcher.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/util/watcher/filewatcher.go b/pkg/util/watcher/filewatcher.go index c4e6d9f54..d114d64ab 100644 --- a/pkg/util/watcher/filewatcher.go +++ b/pkg/util/watcher/filewatcher.go @@ -223,7 +223,7 @@ func (w *FileWatcher) dispatchFunc() { for { // Wait until we have a batch dispatched to us - w.batcher.ProcessBatch(func(key, val interface{}) bool { + ok := w.batcher.ProcessBatch(func(key, val interface{}) bool { filePath := key.(string) // Concatenate all known events, and dispatch them to be handled one by one @@ -234,6 +234,9 @@ func (w *FileWatcher) dispatchFunc() { // Continue traversing the map return true }) + if !ok { + return // The BatchWriter channel is closed, stop processing + } log.Debug("FileWatcher: Dispatched events batch and reset the events cache") } From 1d6077f5699c6896bee2a8b9068381a2ace74681 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lucas=20K=C3=A4ldstr=C3=B6m?= Date: Wed, 31 Jul 2019 11:58:50 +0300 Subject: [PATCH 7/7] remove unused unit test --- pkg/util/watcher/event_test.go | 18 ------------------ 1 file changed, 18 deletions(-) delete mode 100644 pkg/util/watcher/event_test.go diff --git a/pkg/util/watcher/event_test.go b/pkg/util/watcher/event_test.go deleted file mode 100644 index 6d6c31cac..000000000 --- a/pkg/util/watcher/event_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package watcher - -import ( - "reflect" - "sort" - "testing" -) - -func TestEventSort(t *testing.T) { - e1 := Events{EventDelete, EventModify, EventCreate} - e2 := Events{EventCreate, EventDelete, EventModify} - - sort.Sort(e1) - sort.Sort(e2) - if !reflect.DeepEqual(e1, e2) { - t.Errorf("events do not match: %v %v", e1, e2) - } -}