diff --git a/pkg/storage/sync/storage.go b/pkg/storage/sync/storage.go index fd2f58814..850c2ac57 100644 --- a/pkg/storage/sync/storage.go +++ b/pkg/storage/sync/storage.go @@ -139,8 +139,8 @@ func (ss *SyncStorage) monitorFunc() { // For now, only update the state on write when Ignite is running for { upd, ok := <-ss.eventStream - log.Debugf("SyncStorage: Received update %v %t", upd, ok) if ok { + log.Debugf("SyncStorage: Received update %v %t", upd, ok) switch upd.Event { case update.ObjectEventModify, update.ObjectEventCreate: // First load the Object using the Storage given in the update, diff --git a/pkg/storage/watch/storage.go b/pkg/storage/watch/storage.go index f235284c3..82f3a5930 100644 --- a/pkg/storage/watch/storage.go +++ b/pkg/storage/watch/storage.go @@ -142,6 +142,16 @@ func (s *GenericWatchStorage) monitorFunc(raw storage.RawStorage, files []string continue } + if event.Event == watcher.FileEventMove { + // Update the mappings for the moved file (AddMapping overwrites) + if mapped, ok := raw.(storage.MappedRawStorage); ok { + mapped.AddMapping(storage.NewKey(obj.GetKind(), obj.GetUID()), event.Path) + } + + // Internal move events are a no-op + continue + } + // This is based on the key's existence instead of watcher.EventCreate, // as Objects can get updated (via watcher.FileEventModify) to be conformant if _, err = raw.GetKey(event.Path); err != nil { diff --git a/pkg/util/watcher/event.go b/pkg/util/watcher/event.go index eea8eef9b..4da933d79 100644 --- a/pkg/util/watcher/event.go +++ b/pkg/util/watcher/event.go @@ -5,13 +5,14 @@ import ( "strings" ) -// FileEvent is an enum describing a change in a file's state. +// FileEvent is an enum describing a change in a file's state type FileEvent byte const ( FileEventNone FileEvent = iota // 0 FileEventModify // 1 FileEventDelete // 2 + FileEventMove // 3 ) func (e FileEvent) String() string { @@ -22,6 +23,8 @@ func (e FileEvent) String() string { return "MODIFY" case 2: return "DELETE" + case 3: + return "MOVE" } return "UNKNOWN" diff --git a/pkg/util/watcher/filewatcher.go b/pkg/util/watcher/filewatcher.go index 9238e9f19..e55e2ef14 100644 --- a/pkg/util/watcher/filewatcher.go +++ b/pkg/util/watcher/filewatcher.go @@ -198,7 +198,7 @@ func (w *FileWatcher) dispatchFunc() { // Wait until we have a batch dispatched to us ok := w.batcher.ProcessBatch(func(key, val interface{}) bool { // Concatenate all known events, and dispatch them to be handled one by one - for _, event := range concatenateEvents(val.(notifyEvents)) { + for _, event := range w.concatenateEvents(val.(notifyEvents)) { w.sendUpdate(event) } @@ -283,39 +283,90 @@ func convertUpdate(event notify.EventInfo) *FileUpdate { } // moveCache caches an event during a move operation -var moveCache notify.EventInfo +// and dispatches a FileUpdate if it's not cancelled +type moveCache struct { + watcher *FileWatcher + event notify.EventInfo + timer *time.Timer +} + +func (w *FileWatcher) newMoveCache(event notify.EventInfo) *moveCache { + m := &moveCache{ + watcher: w, + event: event, + } + + // moveCaches wait one second to be cancelled before firing + m.timer = time.AfterFunc(time.Second, m.incomplete) + return m +} + +func (m *moveCache) cookie() uint32 { + return ievent(m.event).Cookie +} + +// If the moveCache isn't cancelled, the move is considered incomplete and this +// method is fired. A complete move consists out of a "from" event and a "to" event, +// if only one is received, the file is moved in/out of a watched directory, which +// is treated as a normal creation/deletion by this method. +func (m *moveCache) incomplete() { + var event FileEvent + + switch m.event.Event() { + case notify.InMovedFrom: + event = FileEventDelete + case notify.InMovedTo: + event = FileEventModify + default: + // This should never happen + panic(fmt.Sprintf("moveCache: unrecognized event: %v", m.event.Event())) + } + + log.Tracef("moveCache: Timer expired for %d, dispatching...", m.cookie()) + m.watcher.sendUpdate(&FileUpdate{event, m.event.Path()}) + + // Delete the cache after the timer has fired + delete(moveCaches, m.cookie()) +} + +func (m *moveCache) cancel() { + m.timer.Stop() + delete(moveCaches, m.cookie()) + log.Tracef("moveCache: Dispatching cancelled for %d", m.cookie()) +} + +// moveCaches keeps track of active moves by cookie +var moveCaches = make(map[uint32]*moveCache) // move processes InMovedFrom and InMovedTo events in any order -// and converts them to FileEvents when a move is detected -func move(event notify.EventInfo) (updates FileUpdates) { - if moveCache == nil { - moveCache = event +// and dispatches FileUpdates when a move is detected +func (w *FileWatcher) move(event notify.EventInfo) (moveUpdate *FileUpdate) { + cookie := ievent(event).Cookie + cache, ok := moveCaches[cookie] + if !ok { + // The cookie is not cached, create a new cache object for it + moveCaches[cookie] = w.newMoveCache(event) return } - deletePath, modifyPath := moveCache.Path(), event.Path() + sourcePath, destPath := cache.event.Path(), event.Path() switch event.Event() { case notify.InMovedFrom: - deletePath, modifyPath = modifyPath, deletePath + sourcePath, destPath = destPath, sourcePath fallthrough case notify.InMovedTo: - if ievent(moveCache).Cookie == ievent(event).Cookie { - updates = append(updates, &FileUpdate{FileEventDelete, deletePath}, - &FileUpdate{FileEventModify, modifyPath}) - moveCache = nil - } else { - moveCache = event - } + cache.cancel() // Cancel dispatching the cache's incomplete move + moveUpdate = &FileUpdate{FileEventMove, destPath} // Register an internal, complete move instead + log.Tracef("FileWatcher: Detected move: %q -> %q", sourcePath, destPath) } - log.Tracef("FileWatcher: Detected move: %q -> %q", deletePath, modifyPath) return } // concatenateEvents takes in a slice of events and concatenates // all events possible based on combinedEvents. It also manages // file moving and conversion from notifyEvents to FileUpdates -func concatenateEvents(events notifyEvents) FileUpdates { +func (w *FileWatcher) concatenateEvents(events notifyEvents) FileUpdates { for _, combinedEvent := range combinedEvents { // Test if the prefix of the given events matches combinedEvent.input if event, ok := combinedEvent.match(events); ok { @@ -326,7 +377,7 @@ func concatenateEvents(events notifyEvents) FileUpdates { } log.Tracef("FileWatcher: Concatenated events: %v -> %v", events, concatenated) - return concatenateEvents(concatenated) + return w.concatenateEvents(concatenated) } } @@ -335,8 +386,10 @@ func concatenateEvents(events notifyEvents) FileUpdates { for _, event := range events { switch event.Event() { case notify.InMovedFrom, notify.InMovedTo: - if moveUpdates := move(event); len(moveUpdates) > 0 { - updates = append(updates, moveUpdates...) + // Send move-related events to w.move + if update := w.move(event); update != nil { + // Add the update to the list if we get something back + updates = append(updates, update) } default: updates = append(updates, convertUpdate(event))