Skip to content
This repository has been archived by the owner on Dec 7, 2023. It is now read-only.

Extract watcher, batcher and monitor into pkg/util #245

Merged
merged 8 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/gitops/gitops.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/weaveworks/ignite/pkg/operations"
"github.com/weaveworks/ignite/pkg/storage/cache"
"github.com/weaveworks/ignite/pkg/storage/manifest"
"github.com/weaveworks/ignite/pkg/storage/watch/update"
"github.com/weaveworks/ignite/pkg/util"
"github.com/weaveworks/ignite/pkg/util/watcher"
)

var (
Expand Down Expand Up @@ -58,7 +58,7 @@ func RunLoop(url, branch string, paths []string) error {
}

var vm *api.VM
if upd.Event == update.EventDelete {
if upd.Event == watcher.EventDelete {
// As we know this VM was deleted, it wouldn't show up in a Get() call
// Construct a temporary VM object for passing to the delete function
vm = &api.VM{
Expand Down Expand Up @@ -86,15 +86,15 @@ func RunLoop(url, branch string, paths []string) error {

// TODO: Paralellization
switch upd.Event {
case update.EventCreate:
case watcher.EventCreate:
runHandle(func() error {
return handleCreate(vm)
})
case update.EventModify:
case watcher.EventModify:
runHandle(func() error {
return handleChange(vm)
})
case update.EventDelete:
case watcher.EventDelete:
runHandle(func() error {
// TODO: Temporary VM Object for removal
return handleDelete(vm)
Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/sync/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
"github.com/weaveworks/ignite/pkg/storage"
"github.com/weaveworks/ignite/pkg/storage/watch"
"github.com/weaveworks/ignite/pkg/storage/watch/update"
"github.com/weaveworks/ignite/pkg/util"
"github.com/weaveworks/ignite/pkg/util/sync"
"github.com/weaveworks/ignite/pkg/util/watcher"
"k8s.io/apimachinery/pkg/runtime/schema"
)

Expand All @@ -28,7 +29,7 @@ type SyncStorage struct {
storages []storage.Storage
eventStream watch.AssociatedEventStream
updateStream UpdateStream
monitor *util.Monitor
monitor *sync.Monitor
}

var _ storage.Storage = &SyncStorage{}
Expand All @@ -48,7 +49,7 @@ func NewSyncStorage(rwStorage storage.Storage, wStorages ...storage.Storage) sto
}

if ss.eventStream != nil {
ss.monitor = util.RunMonitor(ss.monitorFunc)
ss.monitor = sync.RunMonitor(ss.monitorFunc)
}

return ss
Expand Down Expand Up @@ -141,7 +142,7 @@ func (ss *SyncStorage) monitorFunc() {
log.Debugf("SyncStorage: Received update %v %t", upd, ok)
if ok {
switch upd.Event {
case update.EventModify, update.EventCreate:
case watcher.EventModify, watcher.EventCreate:
// First load the Object using the Storage given in the update,
// then set it using the client constructed above
updClient := client.NewClient(upd.Storage).Dynamic(upd.APIType.GetKind())
Expand All @@ -155,7 +156,7 @@ func (ss *SyncStorage) monitorFunc() {
log.Errorf("Failed to set Object with UID %q: %v", upd.APIType.GetUID(), err)
continue
}
case update.EventDelete:
case watcher.EventDelete:
// For deletion we use the generated "fake" APIType object
if err := c.Dynamic(upd.APIType.GetKind()).Delete(upd.APIType.GetUID()); err != nil {
log.Errorf("Failed to delete Object with UID %q: %v", upd.APIType.GetUID(), err)
Expand Down
69 changes: 0 additions & 69 deletions pkg/storage/watch/batcher.go

This file was deleted.

31 changes: 16 additions & 15 deletions pkg/storage/watch/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
"github.com/weaveworks/ignite/pkg/storage"
"github.com/weaveworks/ignite/pkg/storage/manifest/raw"
"github.com/weaveworks/ignite/pkg/storage/watch/update"
"github.com/weaveworks/ignite/pkg/util"
"github.com/weaveworks/ignite/pkg/util/sync"
"github.com/weaveworks/ignite/pkg/util/watcher"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/yaml"
Expand Down Expand Up @@ -40,12 +41,12 @@ func NewGenericWatchStorage(storage storage.Storage) (WatchStorage, error) {

var err error
var files []string
if s.watcher, files, err = newWatcher(storage.RawStorage().Dir()); err != nil {
if s.watcher, files, err = watcher.NewFileWatcher(storage.RawStorage().Dir()); err != nil {
return nil, err
}

if mapped, ok := s.RawStorage().(raw.MappedRawStorage); ok {
s.monitor = util.RunMonitor(func() {
s.monitor = sync.RunMonitor(func() {
s.monitorFunc(mapped, files) // Offload the file registration to the goroutine
})
}
Expand All @@ -56,28 +57,28 @@ func NewGenericWatchStorage(storage storage.Storage) (WatchStorage, error) {
// GenericWatchStorage implements the WatchStorage interface
type GenericWatchStorage struct {
storage.Storage
watcher *watcher
watcher *watcher.FileWatcher
events *AssociatedEventStream
monitor *util.Monitor
monitor *sync.Monitor
}

var _ WatchStorage = &GenericWatchStorage{}

// Suspend modify events during Set
func (s *GenericWatchStorage) Set(gvk schema.GroupVersionKind, obj meta.Object) error {
s.watcher.suspend(update.EventModify)
s.watcher.Suspend(watcher.EventModify)
return s.Storage.Set(gvk, obj)
}

// Suspend modify events during Patch
func (s *GenericWatchStorage) Patch(gvk schema.GroupVersionKind, uid meta.UID, patch []byte) error {
s.watcher.suspend(update.EventModify)
s.watcher.Suspend(watcher.EventModify)
return s.Storage.Patch(gvk, uid, patch)
}

// Suspend delete events during Delete
func (s *GenericWatchStorage) Delete(gvk schema.GroupVersionKind, uid meta.UID) error {
s.watcher.suspend(update.EventDelete)
s.watcher.Suspend(watcher.EventDelete)
return s.Storage.Delete(gvk, uid)
}

Expand All @@ -86,7 +87,7 @@ func (s *GenericWatchStorage) SetEventStream(eventStream AssociatedEventStream)
}

func (s *GenericWatchStorage) Close() {
s.watcher.close()
s.watcher.Close()
s.monitor.Wait()
}

Expand All @@ -100,16 +101,16 @@ func (s *GenericWatchStorage) monitorFunc(mapped raw.MappedRawStorage, files []s
} else {
mapped.AddMapping(storage.NewKey(obj.GetKind(), obj.GetUID()), file)
// Send the event to the events channel
s.sendEvent(update.EventModify, obj)
s.sendEvent(watcher.EventModify, obj)
}
}

for {
if event, ok := <-s.watcher.updates; ok {
if event, ok := <-s.watcher.GetFileUpdateStream(); ok {
var obj meta.Object
var err error

if event.Event == update.EventDelete {
if event.Event == watcher.EventDelete {
var key storage.Key
if key, err = mapped.GetMapping(event.Path); err != nil {
log.Warnf("Failed to retrieve data for %q: %v", event.Path, err)
Expand All @@ -131,8 +132,8 @@ func (s *GenericWatchStorage) monitorFunc(mapped raw.MappedRawStorage, files []s
continue
}

// This is based on the key's existence instead of update.EventCreate,
// as Objects can get updated (via update.EventModify) to be conformant
// This is based on the key's existence instead of watcher.EventCreate,
// as Objects can get updated (via watcher.EventModify) to be conformant
if _, err = mapped.GetMapping(event.Path); err != nil {
mapped.AddMapping(storage.NewKey(obj.GetKind(), obj.GetUID()), event.Path)
}
Expand All @@ -146,7 +147,7 @@ func (s *GenericWatchStorage) monitorFunc(mapped raw.MappedRawStorage, files []s
}
}

func (s *GenericWatchStorage) sendEvent(event update.Event, obj meta.Object) {
func (s *GenericWatchStorage) sendEvent(event watcher.Event, obj meta.Object) {
if s.events != nil {
*s.events <- update.AssociatedUpdate{
Update: update.Update{
Expand Down
13 changes: 0 additions & 13 deletions pkg/storage/watch/update/associated.go

This file was deleted.

18 changes: 0 additions & 18 deletions pkg/storage/watch/update/event_test.go

This file was deleted.

8 changes: 0 additions & 8 deletions pkg/storage/watch/update/file.go

This file was deleted.

12 changes: 11 additions & 1 deletion pkg/storage/watch/update/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@ package update

import (
meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1"
"github.com/weaveworks/ignite/pkg/storage"
"github.com/weaveworks/ignite/pkg/util/watcher"
)

// Update bundles an Event with an
// APIType for Storage retrieval.
type Update struct {
Event Event
Event watcher.Event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event though watcher uses the Events the most, they are not watcher specific, the same events are used to detect changes for Objects in WatchStorage and SyncStorage. Should we move Event to watcher or keep it in update, to be generic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to keep the types fairly closely to where they are used. I did not understand the purpose of the types when seeing them grouped in the update package. I think they are a good fit in this package, at least until something more significant changes in the codebase.

Copy link
Contributor

@twelho twelho Jul 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it seems to work for WatchStorage and SyncStorage in terms of dependencies, LGTM 👍. After a rebase feel free to merge.

APIType meta.Object
}

// AssociatedUpdate bundles together an Update and a Storage
// implementation. This is used by SyncStorage to query the
// correct Storage for the updated Object.
type AssociatedUpdate struct {
Update
Storage storage.Storage
}
Loading