Skip to content
This repository has been archived by the owner on Dec 7, 2023. It is now read-only.

Commit

Permalink
WIP: Changes for making the gitops toolkit work
Browse files Browse the repository at this point in the history
  • Loading branch information
luxas committed Aug 14, 2019
1 parent f351c69 commit 5c4534e
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 57 deletions.
179 changes: 179 additions & 0 deletions pkg/serializer/serializer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package serializer

import (
"fmt"
"io/ioutil"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
k8sserializer "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
)

// Serializer is an interface providing high-level decoding/encoding functionality
// for types registered in a *runtime.Scheme
type Serializer interface {
// DecodeInto takes byte content and a target object to serialize the data into
DecodeInto(content []byte, obj runtime.Object) error
// DecodeFileInto takes a file path and a target object to serialize the data into
DecodeFileInto(filePath string, obj runtime.Object) error

// Decode takes byte content and returns the target object
Decode(content []byte, internal bool) (runtime.Object, error)
// DecodeFile takes a file path and returns the target object
DecodeFile(filePath string, internal bool) (runtime.Object, error)

// EncodeYAML encodes the specified object for a specific version to YAML bytes
EncodeYAML(obj runtime.Object) ([]byte, error)
// EncodeJSON encodes the specified object for a specific version to pretty JSON bytes
EncodeJSON(obj runtime.Object) ([]byte, error)

// DefaultInternal populates the given internal object with the preferred external version's defaults
DefaultInternal(cfg runtime.Object) error

// Scheme provides access to the underlying runtime.Scheme
Scheme() *runtime.Scheme
}

// NewSerializer constructs a new serializer based on a scheme, and optionally a codecfactory
func NewSerializer(scheme *runtime.Scheme, codecs *k8sserializer.CodecFactory) Serializer {
if scheme == nil {
panic("scheme must not be nil")
}

if codecs == nil {
codecs = &k8sserializer.CodecFactory{}
*codecs = k8sserializer.NewCodecFactory(scheme)
}

// Allow both YAML and JSON inputs (JSON is a subset of YAML), and deserialize in strict mode
strictSerializer := json.NewSerializerWithOptions(json.DefaultMetaFactory, scheme, scheme, json.SerializerOptions{
Yaml: true,
Strict: true,
})

return &serializer{
scheme: scheme,
codecs: codecs,
// Construct a codec that uses the strict serializer, but also performs defaulting & conversion
decoder: codecs.CodecForVersions(nil, strictSerializer, nil, runtime.InternalGroupVersioner),
}
}

// serializer implements the Serializer interface
type serializer struct {
scheme *runtime.Scheme
codecs *k8sserializer.CodecFactory
decoder runtime.Decoder
}

// Scheme provides access to the underlying runtime.Scheme
func (s *serializer) Scheme() *runtime.Scheme {
return s.scheme
}

// DecodeFileInto takes a file path and a target object to serialize the data into
func (s *serializer) DecodeFileInto(filePath string, obj runtime.Object) error {
content, err := ioutil.ReadFile(filePath)
if err != nil {
return err
}

return s.DecodeInto(content, obj)
}

// DecodeInto takes byte content and a target object to serialize the data into
func (s *serializer) DecodeInto(content []byte, obj runtime.Object) error {
return runtime.DecodeInto(s.decoder, content, obj)
}

// DecodeFile takes a file path and returns the target object
func (s *serializer) DecodeFile(filePath string, internal bool) (runtime.Object, error) {
content, err := ioutil.ReadFile(filePath)
if err != nil {
return nil, err
}

return s.Decode(content, internal)
}

// Decode takes byte content and returns the target object
func (s *serializer) Decode(content []byte, internal bool) (runtime.Object, error) {
obj, err := runtime.Decode(s.decoder, content)
if err != nil {
return nil, err
}
// Default the object
s.scheme.Default(obj)

// If we did not request an internal conversion, return quickly
if !internal {
return obj, nil
}
// Return the internal version of the object
return s.scheme.ConvertToVersion(obj, runtime.InternalGroupVersioner)
}

// EncodeYAML encodes the specified object for a specific version to YAML bytes
func (s *serializer) EncodeYAML(obj runtime.Object) ([]byte, error) {
return s.encode(obj, runtime.ContentTypeYAML, false)
}

// EncodeJSON encodes the specified object for a specific version to pretty JSON bytes
func (s *serializer) EncodeJSON(obj runtime.Object) ([]byte, error) {
return s.encode(obj, runtime.ContentTypeJSON, true)
}

func (s *serializer) encode(obj runtime.Object, mediaType string, pretty bool) ([]byte, error) {
info, ok := runtime.SerializerInfoForMediaType(s.codecs.SupportedMediaTypes(), mediaType)
if !ok {
return nil, fmt.Errorf("unable to locate encoder -- %q is not a supported media type", mediaType)
}

serializer := info.Serializer
if pretty {
serializer = info.PrettySerializer
}

gvk, err := s.externalGVKForObject(obj)
if err != nil {
return nil, err
}

encoder := s.codecs.EncoderForVersion(serializer, gvk.GroupVersion())
return runtime.Encode(encoder, obj)
}

// DefaultInternal populates the given internal object with the preferred external version's defaults
func (s *serializer) DefaultInternal(cfg runtime.Object) error {
gvk, err := s.externalGVKForObject(cfg)
if err != nil {
return err
}
external, err := s.scheme.New(*gvk)
if err != nil {
return nil
}
if err := s.scheme.Convert(cfg, external, nil); err != nil {
return err
}
s.scheme.Default(external)
return s.scheme.Convert(external, cfg, nil)
}

func (s *serializer) externalGVKForObject(cfg runtime.Object) (*schema.GroupVersionKind, error) {
gvks, unversioned, err := s.scheme.ObjectKinds(cfg)
if unversioned || err != nil || len(gvks) != 1 {
return nil, fmt.Errorf("unversioned %t or err %v or invalid gvks %v", unversioned, err, gvks)
}

gvk := gvks[0]
gvs := s.scheme.PrioritizedVersionsForGroup(gvk.Group)
if len(gvs) < 1 {
return nil, fmt.Errorf("expected some version to be registered for group %s", gvk.Group)
}

// Use the preferred (external) version
gvk.Version = gvs[0].Version
return &gvk, nil
}
50 changes: 50 additions & 0 deletions pkg/serializer/serializer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package serializer_test

import (
"testing"

api "github.com/weaveworks/ignite/pkg/apis/ignite"
"github.com/weaveworks/ignite/pkg/apis/ignite/scheme"
meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1"
"github.com/weaveworks/ignite/pkg/serializer"
)

var s = serializer.NewSerializer(scheme.Scheme, nil)
var sampleobj = &api.VM{
ObjectMeta: meta.ObjectMeta{
Name: "foo",
UID: meta.UID("1234"),
},
Spec: api.VMSpec{
CPUs: 1,
},
}
var samplejson = []byte(`{"kind":"VM","apiVersion":"ignite.weave.works/v1alpha1","metadata":{"name":"foo","uid":"1234"},"spec":{"cpus":1}}`)
var nonstrictjson = []byte(`{"kind":"VM","apiVersion":"ignite.weave.works/v1alpha1","metadata":{"name":"foo","uid":"1234"},"spec":{"cpus":1, "foo": "bar"}}`)

func TestEncodeJSON(t *testing.T) {
b, err := s.EncodeJSON(sampleobj)
t.Fatal(string(b), err)
}

func TestEncodeYAML(t *testing.T) {
b, err := s.EncodeYAML(sampleobj)
t.Fatal(string(b), err)
}

func TestDecode(t *testing.T) {
obj, err := s.Decode(samplejson)
t.Fatal(obj, err)
}

func TestDecodeInto(t *testing.T) {
vm := &api.VM{}
err := s.DecodeInto(samplejson, vm)
t.Fatal(*vm, err)
}

func TestDecodeStrict(t *testing.T) {
vm := &api.VM{}
err := s.DecodeInto(nonstrictjson, vm)
t.Fatal(vm, err)
}
5 changes: 5 additions & 0 deletions pkg/storage/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cache
import (
log "github.com/sirupsen/logrus"
meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1"
"github.com/weaveworks/ignite/pkg/serializer"
"github.com/weaveworks/ignite/pkg/storage"
"k8s.io/apimachinery/pkg/runtime/schema"
)
Expand Down Expand Up @@ -37,6 +38,10 @@ func NewCache(backingStorage storage.Storage) Cache {
return c
}

func (s *cache) Serializer() serializer.Serializer {
return s.storage.Serializer()
}

func (c *cache) New(gvk schema.GroupVersionKind) (meta.Object, error) {
// Request the storage to create the Object. The
// newly generated Object has not got an UID which
Expand Down
9 changes: 4 additions & 5 deletions pkg/storage/manifest/storage.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package manifest

import (
"github.com/weaveworks/ignite/pkg/apis/ignite/scheme"
"github.com/weaveworks/ignite/pkg/constants"
"github.com/weaveworks/ignite/pkg/serializer"
"github.com/weaveworks/ignite/pkg/storage"
"github.com/weaveworks/ignite/pkg/storage/sync"
"github.com/weaveworks/ignite/pkg/storage/watch"
)

func NewManifestStorage(dataDir string) (*ManifestStorage, error) {
ws, err := watch.NewGenericWatchStorage(storage.NewGenericStorage(storage.NewGenericMappedRawStorage(dataDir), scheme.Serializer))
func NewManifestStorage(manifestDir, dataDir string, ser serializer.Serializer) (*ManifestStorage, error) {
ws, err := watch.NewGenericWatchStorage(storage.NewGenericStorage(storage.NewGenericMappedRawStorage(manifestDir), ser))
if err != nil {
return nil, err
}

ss := sync.NewSyncStorage(
storage.NewGenericStorage(
storage.NewGenericRawStorage(constants.DATA_DIR), scheme.Serializer),
storage.NewGenericRawStorage(dataDir), ser),
ws)

return &ManifestStorage{
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/rawstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (r *GenericRawStorage) realPath(key AnyKey) string {
// KindKeys get no special treatment
case Key:
// Keys get the metadata filename added to the returned path
file = constants.METADATA
file = constants.MetadataJSON
default:
panic(fmt.Sprintf("invalid key type received: %T", key))
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"

meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1"
"github.com/weaveworks/ignite/pkg/storage/serializer"
"github.com/weaveworks/ignite/pkg/serializer"
patchutil "github.com/weaveworks/ignite/pkg/util/patch"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -44,23 +44,30 @@ type Storage interface {
Checksum(gvk schema.GroupVersionKind, uid meta.UID) (string, error)
// RawStorage returns the RawStorage instance backing this Storage
RawStorage() RawStorage
// Serializer returns the serializer
Serializer() serializer.Serializer
// Close closes all underlying resources (e.g. goroutines) used; before the application exits
Close() error
}

// NewGenericStorage constructs a new Storage
func NewGenericStorage(rawStorage RawStorage, serializer serializer.Serializer) Storage {
return &GenericStorage{rawStorage, serializer}
return &GenericStorage{rawStorage, serializer, patchutil.NewPatcher(serializer)}
}

// GenericStorage implements the Storage interface
type GenericStorage struct {
raw RawStorage
serializer serializer.Serializer
patcher patchutil.Patcher
}

var _ Storage = &GenericStorage{}

func (s *GenericStorage) Serializer() serializer.Serializer {
return s.serializer
}

// New creates a new Object for the specified kind
// TODO: Create better error handling if the GVK specified is not recognized
func (s *GenericStorage) New(gvk schema.GroupVersionKind) (meta.Object, error) {
Expand Down Expand Up @@ -141,7 +148,7 @@ func (s *GenericStorage) Patch(gvk schema.GroupVersionKind, uid meta.UID, patch
return err
}

newContent, err := patchutil.Apply(oldContent, patch, gvk)
newContent, err := s.patcher.Apply(oldContent, patch, gvk)
if err != nil {
return err
}
Expand Down
11 changes: 3 additions & 8 deletions pkg/storage/sync/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

log "github.com/sirupsen/logrus"
meta "github.com/weaveworks/ignite/pkg/apis/meta/v1alpha1"
"github.com/weaveworks/ignite/pkg/client"
"github.com/weaveworks/ignite/pkg/storage"
"github.com/weaveworks/ignite/pkg/storage/watch"
"github.com/weaveworks/ignite/pkg/storage/watch/update"
Expand Down Expand Up @@ -131,9 +130,6 @@ func (ss *SyncStorage) monitorFunc() {
log.Debug("SyncStorage: Monitoring thread started")
defer log.Debug("SyncStorage: Monitoring thread stopped")

// This is the internal client for propagating updates
c := client.NewClient(ss)

// TODO: Support detecting changes done when Ignite isn't running
// This is difficult to do though, as we have don't know which state is the latest
// For now, only update the state on write when Ignite is running
Expand All @@ -145,20 +141,19 @@ func (ss *SyncStorage) monitorFunc() {
case update.ObjectEventModify, update.ObjectEventCreate:
// First load the Object using the Storage given in the update,
// then set it using the client constructed above
updClient := client.NewClient(upd.Storage).Dynamic(upd.APIType.GetKind())
obj, err := updClient.Get(upd.APIType.GetUID())
obj, err := upd.Storage.Get(upd.APIType.GroupVersionKind(), upd.APIType.GetUID())
if err != nil {
log.Errorf("Failed to get Object with UID %q: %v", upd.APIType.GetUID(), err)
continue
}

if err = c.Dynamic(upd.APIType.GetKind()).Set(obj); err != nil {
if err = ss.Set(obj.GroupVersionKind(), obj); err != nil {
log.Errorf("Failed to set Object with UID %q: %v", upd.APIType.GetUID(), err)
continue
}
case update.ObjectEventDelete:
// For deletion we use the generated "fake" APIType object
if err := c.Dynamic(upd.APIType.GetKind()).Delete(upd.APIType.GetUID()); err != nil {
if err := ss.Delete(upd.APIType.GroupVersionKind(), upd.APIType.GetUID()); err != nil {
log.Errorf("Failed to delete Object with UID %q: %v", upd.APIType.GetUID(), err)
continue
}
Expand Down
Loading

0 comments on commit 5c4534e

Please sign in to comment.