Skip to content

Commit

Permalink
feat: cd event test workflow atifact
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Sukhin <vladislav@kubeshop.io>
  • Loading branch information
vsukhin committed Jul 3, 2024
1 parent 27888b1 commit c02c3d3
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 84 deletions.
1 change: 1 addition & 0 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ func main() {
cfg.EnableImageDataPersistentCache,
cfg.ImageDataPersistentCacheKey,
cfg.TestkubeDashboardURI,
clusterId,
)

go testWorkflowExecutor.Recover(context.Background())
Expand Down
60 changes: 56 additions & 4 deletions cmd/testworkflow-toolkit/artifacts/handler.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
package artifacts

import (
"context"
"fmt"
"io/fs"
"path/filepath"
"sync/atomic"

cdevents "github.com/cdevents/sdk-go/pkg/api"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/dustin/go-humanize"
"github.com/gabriel-vasile/mimetype"

cde "github.com/kubeshop/testkube/pkg/mapper/cdevents"
"github.com/kubeshop/testkube/pkg/ui"
)

type handler struct {
uploader Uploader
processor Processor
postProcessor PostProcessor
pathPrefix string
uploader Uploader
processor Processor
postProcessor PostProcessor
pathPrefix string
cdeventsClient cloudevents.Client
cdeventsArtifactParameters cde.CDEventsArtifactParameters

success atomic.Uint32
errors atomic.Uint32
Expand All @@ -42,6 +49,22 @@ func WithPathPrefix(pathPrefix string) HandlerOpts {
}
}

func WithCDEventsTarget(cdEventsTarget string) HandlerOpts {
return func(h *handler) {
var err error
h.cdeventsClient, err = cloudevents.NewClientHTTP(cloudevents.WithTarget(cdEventsTarget))
if err != nil {
fmt.Printf(ui.LightYellow("failed to create cloud event client: %s"), err.Error())
}
}
}

func WithCDEventsArtifactParameters(cdeventsArtifactParameters cde.CDEventsArtifactParameters) HandlerOpts {
return func(h *handler) {
h.cdeventsArtifactParameters = cdeventsArtifactParameters
}
}

func NewHandler(uploader Uploader, processor Processor, opts ...HandlerOpts) Handler {
h := &handler{
uploader: uploader,
Expand Down Expand Up @@ -82,6 +105,12 @@ func (h *handler) Add(path string, file fs.File, stat fs.FileInfo) (err error) {
err = h.processor.Add(h.uploader, uploadPath, file, stat)
if err == nil {
h.success.Add(1)
if h.cdeventsClient != nil {
err = h.sendCDEvent(path)
if err != nil {
fmt.Printf(ui.LightYellow("failed to send cd event: %s"), err.Error())
}
}
} else {
h.errors.Add(1)
fmt.Printf(ui.Red("%s: failed: %s"), uploadPath, err.Error())
Expand Down Expand Up @@ -128,3 +157,26 @@ func (h *handler) End() (err error) {
}
return nil
}

func (h *handler) sendCDEvent(path string) error {
mtype, err := mimetype.DetectFile(path)
if err != nil {
return err
}

ev, err := cde.MapTestkubeTestWorkflowArtifactToCDEvent(h.cdeventsArtifactParameters, path, mtype.String())
if err != nil {
return err
}

ce, err := cdevents.AsCloudEvent(ev)
if err != nil {
return err
}

if result := h.cdeventsClient.Send(context.Background(), *ce); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send, %v", result)
}

return nil
}
13 changes: 13 additions & 0 deletions cmd/testworkflow-toolkit/commands/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/artifacts"
"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/env"
"github.com/kubeshop/testkube/pkg/mapper/cdevents"
"github.com/kubeshop/testkube/pkg/ui"
)

Expand Down Expand Up @@ -146,6 +147,18 @@ func NewArtifactsCmd() *cobra.Command {
handlerOpts = append(handlerOpts, artifacts.WithPathPrefix(env.Config().Execution.FSPrefix))
}

// Support cd evaents
if env.Config().System.CDEventTarget != "" {
handlerOpts = append(handlerOpts, artifacts.WithCDEventsTarget(env.Config().System.CDEventTarget))
handlerOpts = append(handlerOpts, artifacts.WithCDEventsArtifactParameters(cdevents.CDEventsArtifactParameters{
Id: env.Config().Execution.Id,
Name: env.Config().Execution.Name,
WorkflowName: env.Config().Execution.WorkflowName,
ClusterID: env.Config().System.ClusterID,
DashboardURI: env.Config().System.DashboardUrl,
}))
}

handler := artifacts.NewHandler(uploader, processor, handlerOpts...)

run(handler, walker, os.DirFS("/"))
Expand Down
2 changes: 2 additions & 0 deletions cmd/testworkflow-toolkit/env/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type envSystemConfig struct {
Ip string `envconfig:"TK_IP"`
DashboardUrl string `envconfig:"TK_DASH"`
ApiUrl string `envconfig:"TK_API"`
ClusterID string `envconfig:"TK_CLU"`
CDEventTarget string `envconfig:"TK_CDE"`
}

type envImagesConfig struct {
Expand Down
22 changes: 22 additions & 0 deletions pkg/api/v1/testkube/model_test_workflow_execution_extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (e *TestWorkflowExecution) GetNamespace(defaultNamespace string) string {
}

func (e *TestWorkflowExecution) ContainsExecuteAction() bool {
if e == nil {
return false
}

if e.ResolvedWorkflow == nil || e.ResolvedWorkflow.Spec == nil {
return false
}
Expand All @@ -60,3 +64,21 @@ func (e *TestWorkflowExecution) ContainsExecuteAction() bool {

return false
}

func (e *TestWorkflowExecution) GetTemplateRefs() []TestWorkflowTemplateRef {
if e == nil {
return nil
}

if e.ResolvedWorkflow == nil || e.ResolvedWorkflow.Spec == nil {
return nil
}

var templateRefs []TestWorkflowTemplateRef
steps := append(e.ResolvedWorkflow.Spec.Setup, append(e.ResolvedWorkflow.Spec.Steps, e.ResolvedWorkflow.Spec.After...)...)
for _, step := range steps {
templateRefs = append(templateRefs, step.GetTemplateRefs()...)
}

return templateRefs
}
21 changes: 20 additions & 1 deletion pkg/api/v1/testkube/model_test_workflow_step_extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,28 @@ func (w *TestWorkflowStep) ContainsExecuteAction() bool {
}
}

if w.Parallel.ContainsExecuteAction() {
if w.Parallel != nil && w.Parallel.ContainsExecuteAction() {
return true
}

return false
}

func (w *TestWorkflowStep) GetTemplateRefs() []TestWorkflowTemplateRef {
var templateRefs []TestWorkflowTemplateRef

if w.Template != nil {
templateRefs = append(templateRefs, *w.Template)
}

steps := append(w.Setup, w.Steps...)
for _, step := range steps {
templateRefs = append(templateRefs, step.GetTemplateRefs()...)
}

if w.Parallel != nil {
templateRefs = append(templateRefs, w.Parallel.GetTemplateRefs()...)
}

return templateRefs
}
15 changes: 15 additions & 0 deletions pkg/api/v1/testkube/model_test_workflow_step_parallel_extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,18 @@ func (w *TestWorkflowStepParallel) ContainsExecuteAction() bool {

return false
}

func (w *TestWorkflowStepParallel) GetTemplateRefs() []TestWorkflowTemplateRef {
var templateRefs []TestWorkflowTemplateRef

if w.Template != nil {
templateRefs = append(templateRefs, *w.Template)
}

steps := append(w.Setup, append(w.Steps, w.After...)...)
for _, step := range steps {
templateRefs = append(templateRefs, step.GetTemplateRefs()...)
}

return templateRefs
}
Loading

0 comments on commit c02c3d3

Please sign in to comment.