Skip to content

Commit

Permalink
Merge pull request #830 from rancher/storage-create-idempotency
Browse files Browse the repository at this point in the history
  • Loading branch information
kralicky authored Nov 18, 2022
2 parents 41453a8 + 5f45cae commit 7c1615e
Show file tree
Hide file tree
Showing 15 changed files with 190 additions and 76 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ require (
github.com/magefile/mage v1.14.0
github.com/mattn/go-tty v0.0.4
github.com/mitchellh/mapstructure v1.5.0
github.com/nats-io/nats.go v1.18.0
github.com/nats-io/nats.go v1.20.0
github.com/nats-io/nkeys v0.3.0
github.com/olebedev/when v0.0.0-20211212231525-59bd4edcf9d6
github.com/onsi/ginkgo/v2 v2.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2201,6 +2201,8 @@ github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5
github.com/nats-io/nats.go v1.15.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.18.0 h1:o480Ao6kuSSFyJO75rGTXCEPj7LGkY84C1Ye+Uhm4c0=
github.com/nats-io/nats.go v1.18.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.20.0 h1:T8JJnQfVSdh1CzGiwAOv5hEobYCBho/0EupGznYw0oM=
github.com/nats-io/nats.go v1.20.0/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
3 changes: 3 additions & 0 deletions pkg/gateway/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ func (s *GatewayHTTPServer) ListenAndServe(ctx context.Context) error {
}

metricsListener, err := net.Listen("tcp4", s.conf.MetricsListenAddress)
if err != nil {
return err
}

lg.With(
"api", listener.Addr().String(),
Expand Down
23 changes: 23 additions & 0 deletions pkg/storage/conformance/cluster_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,21 @@ func ClusterStoreTestSuite[T storage.ClusterStore](
Expect(clusters.Items[0].GetMetadata().Capabilities).To(HaveLen(1))
Expect(clusters.Items[0].GetMetadata().Capabilities[0].Name).To(Equal("foo"))
})
It("should fail if the cluster already exists", func() {
cluster := &corev1.Cluster{
Id: newIdWithLine(),
Metadata: &corev1.ClusterMetadata{
Labels: map[string]string{
"foo": "bar",
},
},
}
err := ts.CreateCluster(context.Background(), cluster)
Expect(err).NotTo(HaveOccurred())

err = ts.CreateCluster(context.Background(), cluster)
Expect(err).To(MatchError(storage.ErrAlreadyExists))
})
})
It("should list clusters with a label selector", func() {
create := func(labels map[string]string) *corev1.Cluster {
Expand Down Expand Up @@ -592,5 +607,13 @@ func ClusterStoreTestSuite[T storage.ClusterStore](
Expect(event.Previous.Id).To(Equal(cluster2.Id))
Expect(event.Current).To(BeNil())
})
When("deleting a cluster", func() {
It("should fail if the cluster does not exist", func() {
err := ts.DeleteCluster(context.Background(), &corev1.Reference{
Id: newIdWithLine(),
})
Expect(err).To(MatchError(storage.ErrNotFound))
})
})
}
}
20 changes: 20 additions & 0 deletions pkg/storage/conformance/rbac_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ func RBACStoreTestSuite[T storage.RBACStore](
Expect(err).NotTo(HaveOccurred())
Expect(all.Items).To(BeEmpty())
})
It("should fail if the role already exists", func() {
role := &corev1.Role{
Id: "foo",
}
err := ts.CreateRole(context.Background(), role)
Expect(err).NotTo(HaveOccurred())

err = ts.CreateRole(context.Background(), role)
Expect(err).To(MatchError(storage.ErrAlreadyExists))
})
})
Context("Role Bindings", func() {
It("should initially have no role bindings", func() {
Expand Down Expand Up @@ -101,6 +111,16 @@ func RBACStoreTestSuite[T storage.RBACStore](
Expect(err).NotTo(HaveOccurred())
Expect(all.Items).To(BeEmpty())
})
It("should fail if the role binding already exists", func() {
rb := &corev1.RoleBinding{
Id: "foo",
}
err := ts.CreateRoleBinding(context.Background(), rb)
Expect(err).NotTo(HaveOccurred())

err = ts.CreateRoleBinding(context.Background(), rb)
Expect(err).To(MatchError(storage.ErrAlreadyExists))
})
})
}
}
24 changes: 20 additions & 4 deletions pkg/storage/crds/rbac_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,30 @@ import (
)

func (c *CRDStore) CreateRole(ctx context.Context, role *corev1.Role) error {
return c.client.Create(ctx, &monitoringv1beta1.Role{
err := c.client.Create(ctx, &monitoringv1beta1.Role{
ObjectMeta: metav1.ObjectMeta{
Name: role.Id,
Namespace: c.namespace,
},
Spec: role,
})
if k8serrors.IsAlreadyExists(err) {
return storage.ErrAlreadyExists
}
return err
}

func (c *CRDStore) DeleteRole(ctx context.Context, ref *corev1.Reference) error {
return c.client.Delete(ctx, &monitoringv1beta1.Role{
err := c.client.Delete(ctx, &monitoringv1beta1.Role{
ObjectMeta: metav1.ObjectMeta{
Name: ref.Id,
Namespace: c.namespace,
},
})
if k8serrors.IsNotFound(err) {
return storage.ErrNotFound
}
return err
}

func (c *CRDStore) GetRole(ctx context.Context, ref *corev1.Reference) (*corev1.Role, error) {
Expand All @@ -46,22 +54,30 @@ func (c *CRDStore) GetRole(ctx context.Context, ref *corev1.Reference) (*corev1.
}

func (c *CRDStore) CreateRoleBinding(ctx context.Context, rb *corev1.RoleBinding) error {
return c.client.Create(ctx, &monitoringv1beta1.RoleBinding{
err := c.client.Create(ctx, &monitoringv1beta1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: rb.Id,
Namespace: c.namespace,
},
Spec: rb,
})
if k8serrors.IsAlreadyExists(err) {
return storage.ErrAlreadyExists
}
return err
}

func (c *CRDStore) DeleteRoleBinding(ctx context.Context, ref *corev1.Reference) error {
return c.client.Delete(ctx, &monitoringv1beta1.RoleBinding{
err := c.client.Delete(ctx, &monitoringv1beta1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: ref.Id,
Namespace: c.namespace,
},
})
if k8serrors.IsNotFound(err) {
return storage.ErrNotFound
}
return err
}

func (c *CRDStore) GetRoleBinding(ctx context.Context, ref *corev1.Reference) (*corev1.RoleBinding, error) {
Expand Down
11 changes: 11 additions & 0 deletions pkg/storage/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
)

var ErrNotFound = &NotFoundError{}
var ErrAlreadyExists = &AlreadyExistsError{}

type NotFoundError struct{}

Expand All @@ -16,3 +17,13 @@ func (e *NotFoundError) Error() string {
func (e *NotFoundError) GRPCStatus() *status.Status {
return status.New(codes.NotFound, e.Error())
}

type AlreadyExistsError struct{}

func (e *AlreadyExistsError) Error() string {
return "already exists"
}

func (e *AlreadyExistsError) GRPCStatus() *status.Status {
return status.New(codes.AlreadyExists, e.Error())
}
13 changes: 11 additions & 2 deletions pkg/storage/etcd/cluster_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,24 @@ import (

func (e *EtcdStore) CreateCluster(ctx context.Context, cluster *corev1.Cluster) error {
cluster.SetResourceVersion("")

data, err := protojson.Marshal(cluster)
if err != nil {
return fmt.Errorf("failed to marshal cluster: %w", err)
}
resp, err := e.Client.Put(ctx, path.Join(e.Prefix, clusterKey, cluster.Id), string(data))

key := path.Join(e.Prefix, clusterKey, cluster.Id)
resp, err := e.Client.Txn(ctx).If(
clientv3.Compare(clientv3.Version(key), "=", 0),
).Then(
clientv3.OpPut(key, string(data)),
).Commit()
if err != nil {
return fmt.Errorf("failed to create cluster: %w", err)
}
if !resp.Succeeded {
return storage.ErrAlreadyExists
}

cluster.SetResourceVersion(fmt.Sprint(resp.Header.Revision))
return nil
}
Expand Down
20 changes: 18 additions & 2 deletions pkg/storage/etcd/rbac_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,18 @@ func (e *EtcdStore) CreateRole(ctx context.Context, role *corev1.Role) error {
if err != nil {
return fmt.Errorf("failed to marshal role: %w", err)
}
_, err = e.Client.Put(ctx, path.Join(e.Prefix, roleKey, role.Id), string(data))
key := path.Join(e.Prefix, roleKey, role.Id)
resp, err := e.Client.Txn(ctx).If(
clientv3.Compare(clientv3.Version(key), "=", 0),
).Then(
clientv3.OpPut(key, string(data)),
).Commit()
if err != nil {
return fmt.Errorf("failed to create role: %w", err)
}
if !resp.Succeeded {
return storage.ErrAlreadyExists
}
return nil
}

Expand Down Expand Up @@ -57,10 +65,18 @@ func (e *EtcdStore) CreateRoleBinding(ctx context.Context, roleBinding *corev1.R
if err != nil {
return fmt.Errorf("failed to marshal role binding: %w", err)
}
_, err = e.Client.Put(ctx, path.Join(e.Prefix, roleBindingKey, roleBinding.Id), string(data))
key := path.Join(e.Prefix, roleBindingKey, roleBinding.Id)
resp, err := e.Client.Txn(ctx).If(
clientv3.Compare(clientv3.Version(key), "=", 0),
).Then(
clientv3.OpPut(key, string(data)),
).Commit()
if err != nil {
return fmt.Errorf("failed to create role binding: %w", err)
}
if !resp.Succeeded {
return storage.ErrAlreadyExists
}
return nil
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/jetstream/cluster_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ func (s *JetStreamStore) CreateCluster(ctx context.Context, cluster *corev1.Clus
}
rev, err := s.kv.Clusters.Create(cluster.Id, data)
if err != nil {
if errIsKeyAlreadyExists(err) {
return storage.ErrAlreadyExists
}
return fmt.Errorf("failed to create cluster: %w", err)
}
cluster.SetResourceVersion(fmt.Sprint(rev))
Expand All @@ -32,6 +35,9 @@ func (s *JetStreamStore) CreateCluster(ctx context.Context, cluster *corev1.Clus
func (s *JetStreamStore) DeleteCluster(ctx context.Context, ref *corev1.Reference) error {
_, err := s.GetCluster(ctx, ref)
if err != nil {
if errors.Is(err, nats.ErrKeyNotFound) {
return storage.ErrNotFound
}
return err
}
return s.kv.Clusters.Delete(ref.Id)
Expand Down
11 changes: 11 additions & 0 deletions pkg/storage/jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package jetstream

import (
"context"
"errors"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -169,3 +170,13 @@ func (s *JetStreamStore) KeyValueStore(prefix string) storage.KeyValueStore {
kv: bucket,
}
}

func errIsKeyAlreadyExists(err error) bool {
// TODO: this error code is not exported by the nats.go client
// https://github.com/nats-io/nats.go/issues/1134
apierror := &nats.APIError{}
if errors.As(err, &apierror) {
return apierror.ErrorCode == 10071
}
return false
}
10 changes: 8 additions & 2 deletions pkg/storage/jetstream/rbac_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ func (s *JetStreamStore) CreateRole(ctx context.Context, role *corev1.Role) erro
if err != nil {
return err
}
_, err = s.kv.Roles.Put(role.Id, data)
_, err = s.kv.Roles.Create(role.Id, data)
if errIsKeyAlreadyExists(err) {
return storage.ErrAlreadyExists
}
return err
}

Expand Down Expand Up @@ -49,7 +52,10 @@ func (s *JetStreamStore) CreateRoleBinding(ctx context.Context, rb *corev1.RoleB
if err != nil {
return err
}
_, err = s.kv.RoleBindings.Put(rb.Id, data)
_, err = s.kv.RoleBindings.Create(rb.Id, data)
if errIsKeyAlreadyExists(err) {
return storage.ErrAlreadyExists
}
return err
}

Expand Down
Loading

0 comments on commit 7c1615e

Please sign in to comment.