Skip to content
This repository has been archived by the owner on Dec 7, 2023. It is now read-only.

Commit

Permalink
Merge pull request #341 from twelho/filewatcher-move
Browse files Browse the repository at this point in the history
FileWatcher: Support internal moves without re-creating and multiple active moves at once
  • Loading branch information
twelho authored Aug 16, 2019
2 parents 9e27c5e + 440e2c8 commit fbfc446
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/sync/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func (ss *SyncStorage) monitorFunc() {
// For now, only update the state on write when Ignite is running
for {
upd, ok := <-ss.eventStream
log.Debugf("SyncStorage: Received update %v %t", upd, ok)
if ok {
log.Debugf("SyncStorage: Received update %v %t", upd, ok)
switch upd.Event {
case update.ObjectEventModify, update.ObjectEventCreate:
// First load the Object using the Storage given in the update,
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/watch/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ func (s *GenericWatchStorage) monitorFunc(raw storage.RawStorage, files []string
continue
}

if event.Event == watcher.FileEventMove {
// Update the mappings for the moved file (AddMapping overwrites)
if mapped, ok := raw.(storage.MappedRawStorage); ok {
mapped.AddMapping(storage.NewKey(obj.GetKind(), obj.GetUID()), event.Path)
}

// Internal move events are a no-op
continue
}

// This is based on the key's existence instead of watcher.EventCreate,
// as Objects can get updated (via watcher.FileEventModify) to be conformant
if _, err = raw.GetKey(event.Path); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/util/watcher/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"strings"
)

// FileEvent is an enum describing a change in a file's state.
// FileEvent is an enum describing a change in a file's state
type FileEvent byte

const (
FileEventNone FileEvent = iota // 0
FileEventModify // 1
FileEventDelete // 2
FileEventMove // 3
)

func (e FileEvent) String() string {
Expand All @@ -22,6 +23,8 @@ func (e FileEvent) String() string {
return "MODIFY"
case 2:
return "DELETE"
case 3:
return "MOVE"
}

return "UNKNOWN"
Expand Down
93 changes: 73 additions & 20 deletions pkg/util/watcher/filewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (w *FileWatcher) dispatchFunc() {
// Wait until we have a batch dispatched to us
ok := w.batcher.ProcessBatch(func(key, val interface{}) bool {
// Concatenate all known events, and dispatch them to be handled one by one
for _, event := range concatenateEvents(val.(notifyEvents)) {
for _, event := range w.concatenateEvents(val.(notifyEvents)) {
w.sendUpdate(event)
}

Expand Down Expand Up @@ -283,39 +283,90 @@ func convertUpdate(event notify.EventInfo) *FileUpdate {
}

// moveCache caches an event during a move operation
var moveCache notify.EventInfo
// and dispatches a FileUpdate if it's not cancelled
type moveCache struct {
watcher *FileWatcher
event notify.EventInfo
timer *time.Timer
}

func (w *FileWatcher) newMoveCache(event notify.EventInfo) *moveCache {
m := &moveCache{
watcher: w,
event: event,
}

// moveCaches wait one second to be cancelled before firing
m.timer = time.AfterFunc(time.Second, m.incomplete)
return m
}

func (m *moveCache) cookie() uint32 {
return ievent(m.event).Cookie
}

// If the moveCache isn't cancelled, the move is considered incomplete and this
// method is fired. A complete move consists out of a "from" event and a "to" event,
// if only one is received, the file is moved in/out of a watched directory, which
// is treated as a normal creation/deletion by this method.
func (m *moveCache) incomplete() {
var event FileEvent

switch m.event.Event() {
case notify.InMovedFrom:
event = FileEventDelete
case notify.InMovedTo:
event = FileEventModify
default:
// This should never happen
panic(fmt.Sprintf("moveCache: unrecognized event: %v", m.event.Event()))
}

log.Tracef("moveCache: Timer expired for %d, dispatching...", m.cookie())
m.watcher.sendUpdate(&FileUpdate{event, m.event.Path()})

// Delete the cache after the timer has fired
delete(moveCaches, m.cookie())
}

func (m *moveCache) cancel() {
m.timer.Stop()
delete(moveCaches, m.cookie())
log.Tracef("moveCache: Dispatching cancelled for %d", m.cookie())
}

// moveCaches keeps track of active moves by cookie
var moveCaches = make(map[uint32]*moveCache)

// move processes InMovedFrom and InMovedTo events in any order
// and converts them to FileEvents when a move is detected
func move(event notify.EventInfo) (updates FileUpdates) {
if moveCache == nil {
moveCache = event
// and dispatches FileUpdates when a move is detected
func (w *FileWatcher) move(event notify.EventInfo) (moveUpdate *FileUpdate) {
cookie := ievent(event).Cookie
cache, ok := moveCaches[cookie]
if !ok {
// The cookie is not cached, create a new cache object for it
moveCaches[cookie] = w.newMoveCache(event)
return
}

deletePath, modifyPath := moveCache.Path(), event.Path()
sourcePath, destPath := cache.event.Path(), event.Path()
switch event.Event() {
case notify.InMovedFrom:
deletePath, modifyPath = modifyPath, deletePath
sourcePath, destPath = destPath, sourcePath
fallthrough
case notify.InMovedTo:
if ievent(moveCache).Cookie == ievent(event).Cookie {
updates = append(updates, &FileUpdate{FileEventDelete, deletePath},
&FileUpdate{FileEventModify, modifyPath})
moveCache = nil
} else {
moveCache = event
}
cache.cancel() // Cancel dispatching the cache's incomplete move
moveUpdate = &FileUpdate{FileEventMove, destPath} // Register an internal, complete move instead
log.Tracef("FileWatcher: Detected move: %q -> %q", sourcePath, destPath)
}

log.Tracef("FileWatcher: Detected move: %q -> %q", deletePath, modifyPath)
return
}

// concatenateEvents takes in a slice of events and concatenates
// all events possible based on combinedEvents. It also manages
// file moving and conversion from notifyEvents to FileUpdates
func concatenateEvents(events notifyEvents) FileUpdates {
func (w *FileWatcher) concatenateEvents(events notifyEvents) FileUpdates {
for _, combinedEvent := range combinedEvents {
// Test if the prefix of the given events matches combinedEvent.input
if event, ok := combinedEvent.match(events); ok {
Expand All @@ -326,7 +377,7 @@ func concatenateEvents(events notifyEvents) FileUpdates {
}

log.Tracef("FileWatcher: Concatenated events: %v -> %v", events, concatenated)
return concatenateEvents(concatenated)
return w.concatenateEvents(concatenated)
}
}

Expand All @@ -335,8 +386,10 @@ func concatenateEvents(events notifyEvents) FileUpdates {
for _, event := range events {
switch event.Event() {
case notify.InMovedFrom, notify.InMovedTo:
if moveUpdates := move(event); len(moveUpdates) > 0 {
updates = append(updates, moveUpdates...)
// Send move-related events to w.move
if update := w.move(event); update != nil {
// Add the update to the list if we get something back
updates = append(updates, update)
}
default:
updates = append(updates, convertUpdate(event))
Expand Down

0 comments on commit fbfc446

Please sign in to comment.