Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add CRUD/Refresh Warehouse endpoints #1055

Merged
merged 6 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions api/service/v1alpha1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ service KargoService {

rpc QueryFreight(QueryFreightRequest) returns (QueryFreightResponse);

/* Warehouse APIs */

rpc ListWarehouses(ListWarehousesRequest) returns (ListWarehousesResponse);
rpc GetWarehouse(GetWarehouseRequest) returns (GetWarehouseResponse);
rpc CreateWarehouse(CreateWarehouseRequest) returns (CreateWarehouseResponse);
rpc UpdateWarehouse(UpdateWarehouseRequest) returns (UpdateWarehouseResponse);
rpc DeleteWarehouse(DeleteWarehouseRequest) returns (DeleteWarehouseResponse);
rpc RefreshWarehouse(RefreshWarehouseRequest) returns (RefreshWarehouseResponse);
}

message ComponentVersions {
Expand Down Expand Up @@ -418,3 +426,66 @@ message QueryFreightResponse {
message FreightList {
repeated git.luolix.top.akuity.kargo.pkg.api.v1alpha1.Freight freight = 1;
}

message ListWarehousesRequest {
string project = 1;
}

message ListWarehousesResponse {
repeated git.luolix.top.akuity.kargo.pkg.api.v1alpha1.Warehouse warehouses = 1;
}

message GetWarehouseRequest {
string project = 1;
string name = 2;
}

message GetWarehouseResponse {
git.luolix.top.akuity.kargo.pkg.api.v1alpha1.Warehouse warehouse = 1;
}

message TypedWarehouseSpec {
string project = 1;
string name = 2;
git.luolix.top.akuity.kargo.pkg.api.v1alpha1.WarehouseSpec spec = 3;
}

message CreateWarehouseRequest {
oneof warehouse {
TypedWarehouseSpec typed = 1;
string yaml = 2;
}
}

message CreateWarehouseResponse {
git.luolix.top.akuity.kargo.pkg.api.v1alpha1.Warehouse warehouse = 1;
}

message UpdateWarehouseRequest {
oneof warehouse {
TypedWarehouseSpec typed = 1;
string yaml = 2;
}
}

message UpdateWarehouseResponse {
git.luolix.top.akuity.kargo.pkg.api.v1alpha1.Warehouse warehouse = 1;
}

message DeleteWarehouseRequest {
string project = 1;
string name = 2;
}

message DeleteWarehouseResponse {
/* explicitly empty */
}

message RefreshWarehouseRequest {
string project = 1;
string name = 2;
}

message RefreshWarehouseResponse {
git.luolix.top.akuity.kargo.pkg.api.v1alpha1.Warehouse warehouse = 1;
}
31 changes: 31 additions & 0 deletions api/v1alpha1/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package v1alpha1

import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func refreshObject(
ctx context.Context,
c client.Client,
obj client.Object,
nowFunc func() time.Time,
) error {
patchBytes := []byte(
fmt.Sprintf(
`{"metadata":{"annotations":{"%s":"%s"}}}`,
AnnotationKeyRefresh,
nowFunc().UTC().Format(time.RFC3339),
),
)
patch := client.RawPatch(types.MergePatchType, patchBytes)
if err := c.Patch(ctx, obj, patch); err != nil {
return errors.Wrap(err, "patch annotation")
}
return nil
}
94 changes: 94 additions & 0 deletions api/v1alpha1/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package v1alpha1

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func Test_refreshObject(t *testing.T) {
scheme := k8sruntime.NewScheme()
require.NoError(t, SchemeBuilder.AddToScheme(scheme))

t.Parallel()
newFakeClient := func(obj ...client.Object) client.Client {
return fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(obj...).
Build()
}
mockNow := func() time.Time {
return time.Date(2023, 11, 2, 0, 0, 0, 0, time.UTC)
}
testCases := map[string]struct {
obj client.Object
cli client.Client
nowFunc func() time.Time
errExpected bool
}{
"stage": {
obj: &Stage{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
Name: "stage",
},
},
cli: newFakeClient(&Stage{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
Name: "stage",
},
}),
nowFunc: mockNow,
},
"stage with refresh annotation key": {
obj: &Stage{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
Name: "stage",
},
},
cli: newFakeClient(&Stage{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
Name: "stage",
Annotations: map[string]string{
AnnotationKeyRefresh: time.Date(2023, 11, 1, 0, 0, 0, 0, time.UTC).Format(time.RFC3339),
},
},
}),
nowFunc: mockNow,
},
"non-existing stage": {
obj: &Stage{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
Name: "stage",
},
},
cli: newFakeClient(),
nowFunc: mockNow,
errExpected: true,
},
}
for name, tc := range testCases {
tc := tc
t.Run(name, func(t *testing.T) {
err := refreshObject(context.Background(), tc.cli, tc.obj, tc.nowFunc)
if tc.errExpected {
require.Error(t, err)
return
}
require.Contains(t, tc.obj.GetAnnotations(), AnnotationKeyRefresh)
actual, err := time.Parse(time.RFC3339, tc.obj.GetAnnotations()[AnnotationKeyRefresh])
require.NoError(t, err)
require.Equal(t, tc.nowFunc(), actual)
})
}
}
19 changes: 9 additions & 10 deletions api/v1alpha1/stage_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,16 @@ func RefreshStage(
c client.Client,
namespacedName types.NamespacedName,
) (*Stage, error) {
now := time.Now().UTC().Format(time.RFC3339)
stage := Stage{}
stage.Name = namespacedName.Name
stage.Namespace = namespacedName.Namespace
patchBytes := []byte(fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, AnnotationKeyRefresh, now))
patch := client.RawPatch(types.MergePatchType, patchBytes)
err := c.Patch(ctx, &stage, patch)
if err != nil {
return nil, err
stage := &Stage{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespacedName.Namespace,
Name: namespacedName.Name,
},
}
return &stage, nil
if err := refreshObject(ctx, c, stage, time.Now); err != nil {
return nil, errors.Wrap(err, "refresh")
}
return stage, nil
}

// ClearStageRefresh is called by the Stage controller to clear the refresh
Expand Down
17 changes: 17 additions & 0 deletions api/v1alpha1/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,20 @@ message Subscriptions {
repeated StageSubscription upstream_stages = 2 [json_name = "upstreamStages"];
string warehouse = 3 [json_name = "warehouse"];
}

message Warehouse {
string api_version = 1 [json_name = "apiVersion"];
string kind = 2 [json_name = "kind"];
git.luolix.top.akuity.kargo.pkg.api.metav1.ObjectMeta metadata = 3 [json_name = "metadata"];
WarehouseSpec spec = 4 [json_name = "spec"];
WarehouseStatus status = 5 [json_name = "status"];
}

message WarehouseSpec {
repeated RepoSubscription subscriptions = 1 [json_name = "subscriptions"];
}

message WarehouseStatus {
string error = 1 [json_name = "error"];
int64 observed_generation = 2 [json_name = "observedGeneration"];
}
23 changes: 23 additions & 0 deletions api/v1alpha1/warehouse_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package v1alpha1

import (
"context"
"time"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -30,3 +32,24 @@ func GetWarehouse(
}
return &warehouse, nil
}

// RefreshWarehouse forces reconciliation of a Warehouse by setting an annotation
// on the Warehouse, causing the controller to reconcile it. Currently, the
// annotation value is the timestamp of the request, but might in the
// future include additional metadata/context necessary for the request.
func RefreshWarehouse(
ctx context.Context,
c client.Client,
namespacedName types.NamespacedName,
) (*Warehouse, error) {
warehouse := &Warehouse{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespacedName.Namespace,
Name: namespacedName.Name,
},
}
if err := refreshObject(ctx, c, warehouse, time.Now); err != nil {
return nil, errors.Wrap(err, "refresh")
}
return warehouse, nil
}
1 change: 1 addition & 0 deletions charts/kargo/templates/api/cluster-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rules:
resources:
- promotionpolicies
- stages
- warehouses
verbs:
- "*"
- apiGroups:
Expand Down
54 changes: 54 additions & 0 deletions internal/api/create_warehouse_v1alpha1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package api

import (
"context"

"connectrpc.com/connect"
"github.com/pkg/errors"
kubeerr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/yaml"

kargoapi "github.com/akuity/kargo/api/v1alpha1"
typesv1alpha1 "github.com/akuity/kargo/internal/api/types/v1alpha1"
svcv1alpha1 "github.com/akuity/kargo/pkg/api/service/v1alpha1"
)

func (s *server) CreateWarehouse(
ctx context.Context,
req *connect.Request[svcv1alpha1.CreateWarehouseRequest],
) (*connect.Response[svcv1alpha1.CreateWarehouseResponse], error) {
var warehouse kargoapi.Warehouse
switch {
case req.Msg.GetYaml() != "":
if err := yaml.Unmarshal([]byte(req.Msg.GetYaml()), &warehouse); err != nil {
devholic marked this conversation as resolved.
Show resolved Hide resolved
return nil, connect.NewError(connect.CodeInvalidArgument, errors.Wrap(err, "invalid yaml"))
}
case req.Msg.GetTyped() != nil:
warehouse = kargoapi.Warehouse{
ObjectMeta: metav1.ObjectMeta{
Namespace: req.Msg.GetTyped().GetProject(),
Name: req.Msg.GetTyped().GetName(),
},
Spec: typesv1alpha1.FromWarehouseSpecProto(req.Msg.GetTyped().GetSpec()),
}
default:
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("warehouse should not be empty"))
}

if err := validateProjectAndWarehouseName(warehouse.GetNamespace(), warehouse.GetName()); err != nil {
return nil, err
}
if err := s.validateProject(ctx, warehouse.GetNamespace()); err != nil {
return nil, err
}
if err := s.client.Create(ctx, &warehouse); err != nil {
if kubeerr.IsAlreadyExists(err) {
return nil, connect.NewError(connect.CodeAlreadyExists, err)
}
return nil, connect.NewError(connect.CodeInternal, errors.Wrap(err, "create warehouse"))
}
return connect.NewResponse(&svcv1alpha1.CreateWarehouseResponse{
Warehouse: typesv1alpha1.ToWarehouseProto(warehouse),
}), nil
}
41 changes: 41 additions & 0 deletions internal/api/delete_warehouse_v1alpha1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package api

import (
"context"
"fmt"

"connectrpc.com/connect"
kubeerr "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"

kargoapi "github.com/akuity/kargo/api/v1alpha1"
svcv1alpha1 "github.com/akuity/kargo/pkg/api/service/v1alpha1"
)

func (s *server) DeleteWarehouse(
ctx context.Context,
req *connect.Request[svcv1alpha1.DeleteWarehouseRequest],
) (*connect.Response[svcv1alpha1.DeleteWarehouseResponse], error) {
if err := validateProjectAndWarehouseName(req.Msg.GetProject(), req.Msg.GetName()); err != nil {
return nil, err
}
if err := s.validateProject(ctx, req.Msg.GetProject()); err != nil {
return nil, err
}
var warehouse kargoapi.Warehouse
key := client.ObjectKey{
Namespace: req.Msg.GetProject(),
Name: req.Msg.GetName(),
}
if err := s.client.Get(ctx, key, &warehouse); err != nil {
if kubeerr.IsNotFound(err) {
return nil, connect.NewError(connect.CodeNotFound,
fmt.Errorf("warehouse %q not found", key.String()))
}
return nil, connect.NewError(connect.CodeInternal, err)
}
if err := s.client.Delete(ctx, &warehouse); err != nil && !kubeerr.IsNotFound(err) {
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&svcv1alpha1.DeleteWarehouseResponse{}), nil
}
Loading