Skip to content

Commit

Permalink
feat(pkger): make all pkg applications stateful
Browse files Browse the repository at this point in the history
if no stack is provided then one will be provided for you.

closes: #17997
  • Loading branch information
jsteenb2 committed May 28, 2020
1 parent b25b402 commit 53794bf
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 12 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## v2.0.0-beta.12 [TBD]

### Features

1. [18279](https://github.com/influxdata/influxdb/pull/18279): Make all pkg applications stateful via stacks

## v2.0.0-beta.11 [2020-05-26]

### Features
Expand Down
9 changes: 9 additions & 0 deletions cmd/influxd/launcher/pkger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,8 @@ func TestLauncher_Pkger(t *testing.T) {
}),
pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)),
pkger.WithNotificationRuleSVC(l.NotificationRuleService()),
pkger.WithOrganizationService(l.OrganizationService()),
pkger.WithStore(pkger.NewStoreKV(l.kvStore)),
pkger.WithTaskSVC(l.TaskServiceKV()),
pkger.WithTelegrafSVC(l.TelegrafService(t)),
pkger.WithVariableSVC(l.VariableService(t)),
Expand Down Expand Up @@ -1646,6 +1648,8 @@ spec:
impact, err := svc.Apply(ctx, l.Org.ID, l.User.ID, newPkg(t))
require.NoError(t, err)

assert.NotZero(t, impact.StackID)

sum1 := impact.Summary

labels := sum1.Labels
Expand Down Expand Up @@ -2174,6 +2178,8 @@ spec:
pkger.WithLabelSVC(l.LabelService(t)),
pkger.WithNotificationEndpointSVC(l.NotificationEndpointService(t)),
pkger.WithNotificationRuleSVC(l.NotificationRuleService()),
pkger.WithOrganizationService(l.OrganizationService()),
pkger.WithStore(pkger.NewStoreKV(l.kvStore)),
pkger.WithTaskSVC(l.TaskServiceKV()),
pkger.WithTelegrafSVC(l.TelegrafService(t)),
pkger.WithVariableSVC(l.VariableService(t)),
Expand Down Expand Up @@ -2262,6 +2268,7 @@ spec:

impact, err := svc.Apply(ctx, l.Org.ID, l.User.ID, pkg)
require.NoError(t, err)
assert.NotZero(t, impact.StackID)

require.Len(t, impact.Summary.Tasks, 1)
})
Expand Down Expand Up @@ -2372,6 +2379,7 @@ spec:

impact, err := svc.DryRun(ctx, l.Org.ID, l.User.ID, pkg)
require.NoError(t, err)
assert.Zero(t, impact.StackID)

sum := impact.Summary

Expand Down Expand Up @@ -2420,6 +2428,7 @@ spec:
"var-1-name-ref": "var_threeve",
}))
require.NoError(t, err)
assert.NotZero(t, impact.StackID)

sum = impact.Summary

Expand Down
2 changes: 2 additions & 0 deletions http/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7859,6 +7859,8 @@ components:
PkgSummary:
type: object
properties:
stackID:
type: string
summary:
type: object
properties:
Expand Down
10 changes: 8 additions & 2 deletions pkger/http_remote_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,14 @@ func (s *HTTPRemoteService) apply(ctx context.Context, orgID influxdb.ID, pkg *P
return PkgImpactSummary{}, err
}

return PkgImpactSummary{
impact := PkgImpactSummary{
Diff: resp.Diff,
Summary: resp.Summary,
}, NewParseError(resp.Errors...)
}

if stackID, err := influxdb.IDFromString(resp.StackID); err == nil {
impact.StackID = *stackID
}

return impact, NewParseError(resp.Errors...)
}
4 changes: 4 additions & 0 deletions pkger/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func (r ReqApplyPkg) Pkgs(encoding Encoding) (*Pkg, error) {

// RespApplyPkg is the response body for the apply pkg endpoint.
type RespApplyPkg struct {
StackID string `json:"stackID" yaml:"stackID"`
Diff Diff `json:"diff" yaml:"diff"`
Summary Summary `json:"summary" yaml:"summary"`

Expand Down Expand Up @@ -481,6 +482,7 @@ func (s *HTTPServer) applyPkg(w http.ResponseWriter, r *http.Request) {
impact, err := s.svc.DryRun(r.Context(), *orgID, userID, parsedPkg, applyOpts...)
if IsParseErr(err) {
s.api.Respond(w, r, http.StatusUnprocessableEntity, RespApplyPkg{
StackID: impact.StackID.String(),
Diff: impact.Diff,
Summary: impact.Summary,
Errors: convertParseErr(err),
Expand All @@ -493,6 +495,7 @@ func (s *HTTPServer) applyPkg(w http.ResponseWriter, r *http.Request) {
}

s.api.Respond(w, r, http.StatusOK, RespApplyPkg{
StackID: impact.StackID.String(),
Diff: impact.Diff,
Summary: impact.Summary,
})
Expand All @@ -508,6 +511,7 @@ func (s *HTTPServer) applyPkg(w http.ResponseWriter, r *http.Request) {
}

s.api.Respond(w, r, http.StatusCreated, RespApplyPkg{
StackID: impact.StackID.String(),
Diff: impact.Diff,
Summary: impact.Summary,
Errors: convertParseErr(err),
Expand Down
15 changes: 12 additions & 3 deletions pkger/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,7 @@ func (s *Service) filterOrgResourceKinds(resourceKindFilters []Kind) []struct {

// PkgImpactSummary represents the impact the application of a pkg will have on the system.
type PkgImpactSummary struct {
StackID influxdb.ID
Diff Diff
Summary Summary
}
Expand Down Expand Up @@ -719,6 +720,7 @@ func (s *Service) DryRun(ctx context.Context, orgID, userID influxdb.ID, pkg *Pk
}

return PkgImpactSummary{
StackID: opt.StackID,
Diff: state.diff(),
Summary: newSummaryFromStatePkg(state, pkg),
}, nil
Expand Down Expand Up @@ -1215,11 +1217,17 @@ func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg
return PkgImpactSummary{}, err
}

defer func(stackID influxdb.ID) {
if stackID == 0 {
return
stackID := opt.StackID
// if stackID is not provided, a stack will be provided for the application.
if stackID == 0 {
newStack, err := s.InitStack(ctx, userID, Stack{OrgID: orgID})
if err != nil {
return PkgImpactSummary{}, err
}
stackID = newStack.ID
}

defer func(stackID influxdb.ID) {
updateStackFn := s.updateStackAfterSuccess
if e != nil {
updateStackFn = s.updateStackAfterRollback
Expand All @@ -1240,6 +1248,7 @@ func (s *Service) Apply(ctx context.Context, orgID, userID influxdb.ID, pkg *Pkg
pkg.applySecrets(opt.MissingSecrets)

return PkgImpactSummary{
StackID: stackID,
Diff: state.diff(),
Summary: newSummaryFromStatePkg(state, pkg),
}, nil
Expand Down
39 changes: 32 additions & 7 deletions pkger/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,26 @@ func TestService(t *testing.T) {
endpointSVC: mock.NewNotificationEndpointService(),
orgSVC: mock.NewOrganizationService(),
ruleSVC: mock.NewNotificationRuleStore(),
taskSVC: mock.NewTaskService(),
teleSVC: mock.NewTelegrafConfigStore(),
varSVC: mock.NewVariableService(),
store: &fakeStore{
createFn: func(ctx context.Context, stack Stack) error {
return nil
},
readFn: func(ctx context.Context, id influxdb.ID) (Stack, error) {
return Stack{ID: id}, nil
},
updateFn: func(ctx context.Context, stack Stack) error {
return nil
},
},
taskSVC: mock.NewTaskService(),
teleSVC: mock.NewTelegrafConfigStore(),
varSVC: mock.NewVariableService(),
}
for _, o := range opts {
o(&opt)
}

return NewService(
WithIDGenerator(opt.idGen),
WithTimeGenerator(opt.timeGen),
applyOpts := []ServiceSetterFn{
WithStore(opt.store),
WithBucketSVC(opt.bucketSVC),
WithCheckSVC(opt.checkSVC),
Expand All @@ -56,7 +65,15 @@ func TestService(t *testing.T) {
WithTaskSVC(opt.taskSVC),
WithTelegrafSVC(opt.teleSVC),
WithVariableSVC(opt.varSVC),
)
}
if opt.idGen != nil {
applyOpts = append(applyOpts, WithIDGenerator(opt.idGen))
}
if opt.timeGen != nil {
applyOpts = append(applyOpts, WithTimeGenerator(opt.timeGen))
}

return NewService(applyOpts...)
}

t.Run("DryRun", func(t *testing.T) {
Expand Down Expand Up @@ -3244,6 +3261,8 @@ func levelPtr(l notification.CheckLevel) *notification.CheckLevel {

type fakeStore struct {
createFn func(ctx context.Context, stack Stack) error
readFn func(ctx context.Context, id influxdb.ID) (Stack, error)
updateFn func(ctx context.Context, stack Stack) error
}

var _ Store = (*fakeStore)(nil)
Expand All @@ -3260,10 +3279,16 @@ func (s *fakeStore) ListStacks(ctx context.Context, orgID influxdb.ID, f ListFil
}

func (s *fakeStore) ReadStackByID(ctx context.Context, id influxdb.ID) (Stack, error) {
if s.readFn != nil {
return s.readFn(ctx, id)
}
panic("not implemented")
}

func (s *fakeStore) UpdateStack(ctx context.Context, stack Stack) error {
if s.updateFn != nil {
return s.updateFn(ctx, stack)
}
panic("not implemented")
}

Expand Down

0 comments on commit 53794bf

Please sign in to comment.