Skip to content
This repository has been archived by the owner on Dec 7, 2023. It is now read-only.

Commit

Permalink
Merge pull request #347 from weaveworks/gitops-toolkit
Browse files Browse the repository at this point in the history
Split packages so we can extract `gitops-toolkit`
  • Loading branch information
luxas authored Aug 19, 2019
2 parents fd39415 + fa11eda commit 29d163e
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 46 deletions.
2 changes: 1 addition & 1 deletion cmd/ignite-spawn/ignite-spawn.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,5 @@ func patchStopped(vm *api.VM) error {
*/

patch := []byte(`{"status":{"running":false,"ipAddresses":null,"runtime":null,"startTime":null}}`)
return patchutil.ApplyOnFile(constants.IGNITE_SPAWN_VM_FILE_PATH, patch, vm.GroupVersionKind())
return patchutil.NewPatcher(scheme.Serializer).ApplyOnFile(constants.IGNITE_SPAWN_VM_FILE_PATH, patch, vm.GroupVersionKind())
}
2 changes: 1 addition & 1 deletion pkg/apis/ignite/scheme/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/weaveworks/ignite/pkg/apis/ignite"
"github.com/weaveworks/ignite/pkg/apis/ignite/v1alpha1"
"github.com/weaveworks/ignite/pkg/apis/ignite/v1alpha2"
"github.com/weaveworks/ignite/pkg/storage/serializer"
"github.com/weaveworks/ignite/pkg/serializer"
)

var (
Expand Down
4 changes: 3 additions & 1 deletion pkg/gitops/gitops.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"time"

log "github.com/sirupsen/logrus"
"github.com/weaveworks/ignite/pkg/apis/ignite/scheme"
"github.com/weaveworks/ignite/pkg/constants"
"github.com/weaveworks/ignite/pkg/gitops/gitdir"
"github.com/weaveworks/ignite/pkg/operations/reconcile"
"github.com/weaveworks/ignite/pkg/storage/manifest"
Expand All @@ -22,7 +24,7 @@ func RunGitOps(url, branch string, paths []string) error {
gitDir.WaitForClone()

// Construct a manifest storage for the path backed by git
s, err := manifest.NewManifestStorage(gitDir.Dir())
s, err := manifest.NewTwoWayManifestStorage(gitDir.Dir(), constants.DATA_DIR, scheme.Serializer)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/providers/manifeststorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package manifeststorage

import (
log "github.com/sirupsen/logrus"
"github.com/weaveworks/ignite/pkg/apis/ignite/scheme"
"github.com/weaveworks/ignite/pkg/constants"
"github.com/weaveworks/ignite/pkg/providers"
"github.com/weaveworks/ignite/pkg/storage/cache"
Expand All @@ -12,7 +13,7 @@ var ManifestStorage *manifest.ManifestStorage

func SetManifestStorage() (err error) {
log.Trace("Initializing the ManifestStorage provider...")
ManifestStorage, err = manifest.NewManifestStorage(constants.MANIFEST_DIR)
ManifestStorage, err = manifest.NewTwoWayManifestStorage(constants.MANIFEST_DIR, constants.DATA_DIR, scheme.Serializer)
if err != nil {
return
}
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
api "github.com/weaveworks/ignite/pkg/apis/ignite"
"github.com/weaveworks/ignite/pkg/apis/ignite/scheme"
meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1"
"github.com/weaveworks/ignite/pkg/storage/serializer"
"github.com/weaveworks/ignite/pkg/serializer"
)

var s = serializer.NewSerializer(scheme.Scheme, nil)
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cache
import (
log "github.com/sirupsen/logrus"
meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1"
"github.com/weaveworks/ignite/pkg/serializer"
"github.com/weaveworks/ignite/pkg/storage"
"k8s.io/apimachinery/pkg/runtime/schema"
)
Expand Down Expand Up @@ -37,6 +38,10 @@ func NewCache(backingStorage storage.Storage) Cache {
return c
}

func (s *cache) Serializer() serializer.Serializer {
return s.storage.Serializer()
}

func (c *cache) New(gvk schema.GroupVersionKind) (meta.Object, error) {
// Request the storage to create the Object. The
// newly generated Object has not got an UID which
Expand Down
28 changes: 23 additions & 5 deletions pkg/storage/manifest/storage.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,40 @@
package manifest

import (
"github.com/weaveworks/ignite/pkg/apis/ignite/scheme"
"github.com/weaveworks/ignite/pkg/constants"
"github.com/weaveworks/ignite/pkg/serializer"
"github.com/weaveworks/ignite/pkg/storage"
"github.com/weaveworks/ignite/pkg/storage/sync"
"github.com/weaveworks/ignite/pkg/storage/watch"
)

func NewManifestStorage(dataDir string) (*ManifestStorage, error) {
ws, err := watch.NewGenericWatchStorage(storage.NewGenericStorage(storage.NewGenericMappedRawStorage(dataDir), scheme.Serializer))
// NewManifestStorage constructs a new storage that watches unstructured manifests in the specified directory,
// decodable using the given serializer.
func NewManifestStorage(manifestDir string, ser serializer.Serializer) (*ManifestStorage, error) {
ws, err := watch.NewGenericWatchStorage(storage.NewGenericStorage(storage.NewGenericMappedRawStorage(manifestDir), ser))
if err != nil {
return nil, err
}

ss := sync.NewSyncStorage(ws)

return &ManifestStorage{
Storage: ss,
}, nil
}

// NewManifestStorage constructs a new storage that watches unstructured manifests in the specified directory,
// decodable using the given serializer. However, all changes in the manifest directory, are also propagated to
// the structured data directory that's backed by the default storage implementation. Writes to this storage are
// propagated to both the manifest directory, and the data directory.
func NewTwoWayManifestStorage(manifestDir, dataDir string, ser serializer.Serializer) (*ManifestStorage, error) {
ws, err := watch.NewGenericWatchStorage(storage.NewGenericStorage(storage.NewGenericMappedRawStorage(manifestDir), ser))
if err != nil {
return nil, err
}

ss := sync.NewSyncStorage(
storage.NewGenericStorage(
storage.NewGenericRawStorage(constants.DATA_DIR), scheme.Serializer),
storage.NewGenericRawStorage(dataDir), ser),
ws)

return &ManifestStorage{
Expand Down
13 changes: 10 additions & 3 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"

meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1"
"github.com/weaveworks/ignite/pkg/storage/serializer"
"github.com/weaveworks/ignite/pkg/serializer"
patchutil "github.com/weaveworks/ignite/pkg/util/patch"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -44,23 +44,30 @@ type Storage interface {
Checksum(gvk schema.GroupVersionKind, uid meta.UID) (string, error)
// RawStorage returns the RawStorage instance backing this Storage
RawStorage() RawStorage
// Serializer returns the serializer
Serializer() serializer.Serializer
// Close closes all underlying resources (e.g. goroutines) used; before the application exits
Close() error
}

// NewGenericStorage constructs a new Storage
func NewGenericStorage(rawStorage RawStorage, serializer serializer.Serializer) Storage {
return &GenericStorage{rawStorage, serializer}
return &GenericStorage{rawStorage, serializer, patchutil.NewPatcher(serializer)}
}

// GenericStorage implements the Storage interface
type GenericStorage struct {
raw RawStorage
serializer serializer.Serializer
patcher patchutil.Patcher
}

var _ Storage = &GenericStorage{}

func (s *GenericStorage) Serializer() serializer.Serializer {
return s.serializer
}

// New creates a new Object for the specified kind
// TODO: Create better error handling if the GVK specified is not recognized
func (s *GenericStorage) New(gvk schema.GroupVersionKind) (meta.Object, error) {
Expand Down Expand Up @@ -141,7 +148,7 @@ func (s *GenericStorage) Patch(gvk schema.GroupVersionKind, uid meta.UID, patch
return err
}

newContent, err := patchutil.Apply(oldContent, patch, gvk)
newContent, err := s.patcher.Apply(oldContent, patch, gvk)
if err != nil {
return err
}
Expand Down
11 changes: 3 additions & 8 deletions pkg/storage/sync/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

log "github.com/sirupsen/logrus"
meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1"
"github.com/weaveworks/ignite/pkg/client"
"github.com/weaveworks/ignite/pkg/storage"
"github.com/weaveworks/ignite/pkg/storage/watch"
"github.com/weaveworks/ignite/pkg/storage/watch/update"
Expand Down Expand Up @@ -131,9 +130,6 @@ func (ss *SyncStorage) monitorFunc() {
log.Debug("SyncStorage: Monitoring thread started")
defer log.Debug("SyncStorage: Monitoring thread stopped")

// This is the internal client for propagating updates
c := client.NewClient(ss)

// TODO: Support detecting changes done when Ignite isn't running
// This is difficult to do though, as we have don't know which state is the latest
// For now, only update the state on write when Ignite is running
Expand All @@ -145,20 +141,19 @@ func (ss *SyncStorage) monitorFunc() {
case update.ObjectEventModify, update.ObjectEventCreate:
// First load the Object using the Storage given in the update,
// then set it using the client constructed above
updClient := client.NewClient(upd.Storage).Dynamic(upd.APIType.GetKind())
obj, err := updClient.Get(upd.APIType.GetUID())
obj, err := upd.Storage.Get(upd.APIType.GroupVersionKind(), upd.APIType.GetUID())
if err != nil {
log.Errorf("Failed to get Object with UID %q: %v", upd.APIType.GetUID(), err)
continue
}

if err = c.Dynamic(upd.APIType.GetKind()).Set(obj); err != nil {
if err = ss.Set(obj.GroupVersionKind(), obj); err != nil {
log.Errorf("Failed to set Object with UID %q: %v", upd.APIType.GetUID(), err)
continue
}
case update.ObjectEventDelete:
// For deletion we use the generated "fake" APIType object
if err := c.Dynamic(upd.APIType.GetKind()).Delete(upd.APIType.GetUID()); err != nil {
if err := ss.Delete(upd.APIType.GroupVersionKind(), upd.APIType.GetUID()); err != nil {
log.Errorf("Failed to delete Object with UID %q: %v", upd.APIType.GetUID(), err)
continue
}
Expand Down
20 changes: 11 additions & 9 deletions pkg/storage/watch/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"io/ioutil"

log "github.com/sirupsen/logrus"
api "github.com/weaveworks/ignite/pkg/apis/ignite"
"github.com/weaveworks/ignite/pkg/apis/ignite/scheme"
meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1"
"github.com/weaveworks/ignite/pkg/storage"
"github.com/weaveworks/ignite/pkg/storage/watch/update"
Expand Down Expand Up @@ -42,8 +40,12 @@ func NewGenericWatchStorage(s storage.Storage) (WatchStorage, error) {
return nil, err
}

// TODO: Fix this
gvs := s.Serializer().Scheme().PreferredVersionAllGroups()
groupName := gvs[0].Group

ws.monitor = sync.RunMonitor(func() {
ws.monitorFunc(ws.RawStorage(), files) // Offload the file registration to the goroutine
ws.monitorFunc(ws.RawStorage(), files, groupName) // Offload the file registration to the goroutine
})

return ws, nil
Expand Down Expand Up @@ -87,14 +89,14 @@ func (s *GenericWatchStorage) Close() error {
return nil
}

func (s *GenericWatchStorage) monitorFunc(raw storage.RawStorage, files []string) {
func (s *GenericWatchStorage) monitorFunc(raw storage.RawStorage, files []string, groupName string) {
log.Debug("GenericWatchStorage: Monitoring thread started")
defer log.Debug("GenericWatchStorage: Monitoring thread stopped")

// Send a MODIFY event for all files (and fill the mappings
// of the MappedRawStorage) before starting to monitor changes
for _, file := range files {
if obj, err := resolveAPIType(file); err != nil {
if obj, err := s.resolveAPIType(file); err != nil {
log.Warnf("Ignoring %q: %v", file, err)
} else {
if mapped, ok := raw.(storage.MappedRawStorage); ok {
Expand Down Expand Up @@ -132,12 +134,12 @@ func (s *GenericWatchStorage) monitorFunc(raw storage.RawStorage, files []string
obj.SetName("<deleted>")
obj.SetUID(key.UID)
obj.SetGroupVersionKind(schema.GroupVersionKind{
Group: api.GroupName,
Group: groupName,
Version: runtime.APIVersionInternal,
Kind: key.Kind.Title(),
})
} else {
if obj, err = resolveAPIType(event.Path); err != nil {
if obj, err = s.resolveAPIType(event.Path); err != nil {
log.Warnf("Ignoring %q: %v", event.Path, err)
continue
}
Expand Down Expand Up @@ -187,7 +189,7 @@ func (s *GenericWatchStorage) sendEvent(event update.ObjectEvent, obj meta.Objec
}
}

func resolveAPIType(path string) (meta.Object, error) {
func (s *GenericWatchStorage) resolveAPIType(path string) (meta.Object, error) {
obj := meta.NewAPIType()
content, err := ioutil.ReadFile(path)
if err != nil {
Expand All @@ -202,7 +204,7 @@ func resolveAPIType(path string) (meta.Object, error) {
gvk := obj.GroupVersionKind()

// Don't decode API objects unknown to Ignite (e.g. Kubernetes manifests)
if !scheme.Scheme.Recognizes(gvk) {
if !s.Serializer().Scheme().Recognizes(gvk) {
return nil, fmt.Errorf("unknown API version %q and/or kind %q", obj.APIVersion, obj.Kind)
}

Expand Down
43 changes: 27 additions & 16 deletions pkg/util/patch/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,37 @@ import (
"fmt"
"io/ioutil"

"github.com/weaveworks/ignite/pkg/apis/ignite/scheme"
meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1"
"github.com/weaveworks/ignite/pkg/serializer"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/strategicpatch"
)

// The default serializer used here. In the future we maybe want to
// make this configurable
var serializer = scheme.Serializer
type Patcher interface {
Create(new meta.Object, applyFn func(meta.Object) error) ([]byte, error)
Apply(original, patch []byte, gvk schema.GroupVersionKind) ([]byte, error)
ApplyOnFile(filePath string, patch []byte, gvk schema.GroupVersionKind) error
}

func NewPatcher(s serializer.Serializer) Patcher {
return &patcher{serializer: s}
}

type patcher struct {
serializer serializer.Serializer
}

// Create is a helper that creates a patch out of the change made in applyFn
func Create(new meta.Object, applyFn func(meta.Object) error) ([]byte, error) {
func (p *patcher) Create(new meta.Object, applyFn func(meta.Object) error) ([]byte, error) {
old := new.DeepCopyObject().(meta.Object)

oldbytes, err := serializer.EncodeJSON(old)
oldbytes, err := p.serializer.EncodeJSON(old)
if err != nil {
return nil, err
}

emptyobj, err := serializer.Scheme().New(old.GroupVersionKind())
emptyobj, err := p.serializer.Scheme().New(old.GroupVersionKind())
if err != nil {
return nil, err
}
Expand All @@ -33,7 +43,7 @@ func Create(new meta.Object, applyFn func(meta.Object) error) ([]byte, error) {
return nil, err
}

newbytes, err := serializer.EncodeJSON(new)
newbytes, err := p.serializer.EncodeJSON(new)
if err != nil {
return nil, err
}
Expand All @@ -46,8 +56,8 @@ func Create(new meta.Object, applyFn func(meta.Object) error) ([]byte, error) {
return patchBytes, nil
}

func Apply(original, patch []byte, gvk schema.GroupVersionKind) ([]byte, error) {
emptyobj, err := serializer.Scheme().New(gvk)
func (p *patcher) Apply(original, patch []byte, gvk schema.GroupVersionKind) ([]byte, error) {
emptyobj, err := p.serializer.Scheme().New(gvk)
if err != nil {
return nil, err
}
Expand All @@ -57,16 +67,16 @@ func Apply(original, patch []byte, gvk schema.GroupVersionKind) ([]byte, error)
return nil, err
}

return serializerEncode(b)
return p.serializerEncode(b)
}

func ApplyOnFile(filePath string, patch []byte, gvk schema.GroupVersionKind) error {
func (p *patcher) ApplyOnFile(filePath string, patch []byte, gvk schema.GroupVersionKind) error {
oldContent, err := ioutil.ReadFile(filePath)
if err != nil {
return err
}

newContent, err := Apply(oldContent, patch, gvk)
newContent, err := p.Apply(oldContent, patch, gvk)
if err != nil {
return err
}
Expand All @@ -77,10 +87,11 @@ func ApplyOnFile(filePath string, patch []byte, gvk schema.GroupVersionKind) err
// StrategicMergePatch returns an unindented, unorganized JSON byte slice,
// this helper takes that as an input and returns the same JSON re-encoded
// with the serializer so it conforms to a runtime.Object
func serializerEncode(input []byte) (result []byte, err error) {
// TODO: Just use encoding/json.Indent here instead?
func (p *patcher) serializerEncode(input []byte) (result []byte, err error) {
var obj runtime.Object
if obj, err = serializer.Decode(input, true); err == nil {
result, err = serializer.EncodeJSON(obj)
if obj, err = p.serializer.Decode(input, true); err == nil {
result, err = p.serializer.EncodeJSON(obj)
}

return
Expand Down

0 comments on commit 29d163e

Please sign in to comment.