Skip to content
This repository has been archived by the owner on Dec 7, 2023. It is now read-only.

FileWatcher: Support internal moves without re-creating and multiple active moves at once #341

Merged
merged 2 commits into from
Aug 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/storage/sync/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func (ss *SyncStorage) monitorFunc() {
// For now, only update the state on write when Ignite is running
for {
upd, ok := <-ss.eventStream
log.Debugf("SyncStorage: Received update %v %t", upd, ok)
if ok {
log.Debugf("SyncStorage: Received update %v %t", upd, ok)
switch upd.Event {
case update.ObjectEventModify, update.ObjectEventCreate:
// First load the Object using the Storage given in the update,
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/watch/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ func (s *GenericWatchStorage) monitorFunc(raw storage.RawStorage, files []string
continue
}

if event.Event == watcher.FileEventMove {
// Update the mappings for the moved file (AddMapping overwrites)
if mapped, ok := raw.(storage.MappedRawStorage); ok {
mapped.AddMapping(storage.NewKey(obj.GetKind(), obj.GetUID()), event.Path)
}

// Internal move events are a no-op
continue
}

// This is based on the key's existence instead of watcher.EventCreate,
// as Objects can get updated (via watcher.FileEventModify) to be conformant
if _, err = raw.GetKey(event.Path); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/util/watcher/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"strings"
)

// FileEvent is an enum describing a change in a file's state.
// FileEvent is an enum describing a change in a file's state
type FileEvent byte

const (
FileEventNone FileEvent = iota // 0
FileEventModify // 1
FileEventDelete // 2
FileEventMove // 3
)

func (e FileEvent) String() string {
Expand All @@ -22,6 +23,8 @@ func (e FileEvent) String() string {
return "MODIFY"
case 2:
return "DELETE"
case 3:
return "MOVE"
}

return "UNKNOWN"
Expand Down
93 changes: 73 additions & 20 deletions pkg/util/watcher/filewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (w *FileWatcher) dispatchFunc() {
// Wait until we have a batch dispatched to us
ok := w.batcher.ProcessBatch(func(key, val interface{}) bool {
// Concatenate all known events, and dispatch them to be handled one by one
for _, event := range concatenateEvents(val.(notifyEvents)) {
for _, event := range w.concatenateEvents(val.(notifyEvents)) {
w.sendUpdate(event)
}

Expand Down Expand Up @@ -283,39 +283,90 @@ func convertUpdate(event notify.EventInfo) *FileUpdate {
}

// moveCache caches an event during a move operation
var moveCache notify.EventInfo
// and dispatches a FileUpdate if it's not cancelled
type moveCache struct {
watcher *FileWatcher
event notify.EventInfo
timer *time.Timer
}

func (w *FileWatcher) newMoveCache(event notify.EventInfo) *moveCache {
m := &moveCache{
watcher: w,
event: event,
}

// moveCaches wait one second to be cancelled before firing
m.timer = time.AfterFunc(time.Second, m.incomplete)
return m
}

func (m *moveCache) cookie() uint32 {
return ievent(m.event).Cookie
}

// If the moveCache isn't cancelled, the move is considered incomplete and this
// method is fired. A complete move consists out of a "from" event and a "to" event,
// if only one is received, the file is moved in/out of a watched directory, which
// is treated as a normal creation/deletion by this method.
func (m *moveCache) incomplete() {
var event FileEvent

switch m.event.Event() {
case notify.InMovedFrom:
event = FileEventDelete
case notify.InMovedTo:
event = FileEventModify
default:
// This should never happen
panic(fmt.Sprintf("moveCache: unrecognized event: %v", m.event.Event()))
}

log.Tracef("moveCache: Timer expired for %d, dispatching...", m.cookie())
m.watcher.sendUpdate(&FileUpdate{event, m.event.Path()})

// Delete the cache after the timer has fired
delete(moveCaches, m.cookie())
}

func (m *moveCache) cancel() {
m.timer.Stop()
delete(moveCaches, m.cookie())
log.Tracef("moveCache: Dispatching cancelled for %d", m.cookie())
}

// moveCaches keeps track of active moves by cookie
var moveCaches = make(map[uint32]*moveCache)

// move processes InMovedFrom and InMovedTo events in any order
// and converts them to FileEvents when a move is detected
func move(event notify.EventInfo) (updates FileUpdates) {
if moveCache == nil {
moveCache = event
// and dispatches FileUpdates when a move is detected
func (w *FileWatcher) move(event notify.EventInfo) (moveUpdate *FileUpdate) {
cookie := ievent(event).Cookie
cache, ok := moveCaches[cookie]
if !ok {
// The cookie is not cached, create a new cache object for it
moveCaches[cookie] = w.newMoveCache(event)
return
}

deletePath, modifyPath := moveCache.Path(), event.Path()
sourcePath, destPath := cache.event.Path(), event.Path()
switch event.Event() {
case notify.InMovedFrom:
deletePath, modifyPath = modifyPath, deletePath
sourcePath, destPath = destPath, sourcePath
fallthrough
case notify.InMovedTo:
if ievent(moveCache).Cookie == ievent(event).Cookie {
updates = append(updates, &FileUpdate{FileEventDelete, deletePath},
&FileUpdate{FileEventModify, modifyPath})
moveCache = nil
} else {
moveCache = event
}
cache.cancel() // Cancel dispatching the cache's incomplete move
moveUpdate = &FileUpdate{FileEventMove, destPath} // Register an internal, complete move instead
log.Tracef("FileWatcher: Detected move: %q -> %q", sourcePath, destPath)
}

log.Tracef("FileWatcher: Detected move: %q -> %q", deletePath, modifyPath)
return
}

// concatenateEvents takes in a slice of events and concatenates
// all events possible based on combinedEvents. It also manages
// file moving and conversion from notifyEvents to FileUpdates
func concatenateEvents(events notifyEvents) FileUpdates {
func (w *FileWatcher) concatenateEvents(events notifyEvents) FileUpdates {
for _, combinedEvent := range combinedEvents {
// Test if the prefix of the given events matches combinedEvent.input
if event, ok := combinedEvent.match(events); ok {
Expand All @@ -326,7 +377,7 @@ func concatenateEvents(events notifyEvents) FileUpdates {
}

log.Tracef("FileWatcher: Concatenated events: %v -> %v", events, concatenated)
return concatenateEvents(concatenated)
return w.concatenateEvents(concatenated)
}
}

Expand All @@ -335,8 +386,10 @@ func concatenateEvents(events notifyEvents) FileUpdates {
for _, event := range events {
switch event.Event() {
case notify.InMovedFrom, notify.InMovedTo:
if moveUpdates := move(event); len(moveUpdates) > 0 {
updates = append(updates, moveUpdates...)
// Send move-related events to w.move
if update := w.move(event); update != nil {
// Add the update to the list if we get something back
updates = append(updates, update)
}
default:
updates = append(updates, convertUpdate(event))
Expand Down