Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix stage qualifications for flow control stages #999

Merged
merged 3 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 128 additions & 16 deletions internal/controller/stages/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,24 @@ type reconciler struct {
subs kargoapi.Subscriptions,
) (*kargoapi.Freight, error)

getAllFreightFromWarehouseFn func(
ctx context.Context,
namespace string,
warehouse string,
) ([]kargoapi.Freight, error)

getLatestFreightFromWarehouseFn func(
ctx context.Context,
namespace string,
warehouse string,
) (*kargoapi.Freight, error)

getAllFreightQualifiedForUpstreamStagesFn func(
ctx context.Context,
namespace string,
stageSubs []kargoapi.StageSubscription,
) ([]kargoapi.Freight, error)

getLatestFreightQualifiedForUpstreamStagesFn func(
ctx context.Context,
namespace string,
Expand Down Expand Up @@ -237,7 +249,9 @@ func newReconciler(kargoClient, argoClient client.Client) *reconciler {
r.createPromotionFn = kargoClient.Create
// Discovering latest Freight:
r.getLatestAvailableFreightFn = r.getLatestAvailableFreight
r.getAllFreightFromWarehouseFn = r.getAllFreightFromWarehouse
r.getLatestFreightFromWarehouseFn = r.getLatestFreightFromWarehouse
r.getAllFreightQualifiedForUpstreamStagesFn = r.getAllFreightQualifiedForUpstreamStages
r.getLatestFreightQualifiedForUpstreamStagesFn = r.getLatestFreightQualifiedForUpstreamStages
r.listFreightFn = r.kargoClient.List
return r
Expand Down Expand Up @@ -278,7 +292,11 @@ func (r *reconciler) Reconcile(
logger.Debug("found Stage")

var newStatus kargoapi.StageStatus
newStatus, err = r.syncStage(ctx, stage)
if stage.Spec.PromotionMechanisms == nil {
newStatus, err = r.syncControlFlowStage(ctx, stage)
} else {
newStatus, err = r.syncNormalStage(ctx, stage)
}
if err != nil {
newStatus.Error = err.Error()
logger.Errorf("error syncing Stage: %s", stage.Status.Error)
Expand Down Expand Up @@ -315,7 +333,78 @@ func (r *reconciler) Reconcile(
return result, err
}

func (r *reconciler) syncStage(
func (r *reconciler) syncControlFlowStage(
ctx context.Context,
stage *kargoapi.Stage,
) (kargoapi.StageStatus, error) {
status := *stage.Status.DeepCopy()
status.ObservedGeneration = stage.Generation
status.Health = nil // Reset health
status.CurrentPromotion = nil

// A Stage without promotion mechanisms shouldn't have a currentFreight. Make
// sure this is empty to avoid confusion. A reason this could be non-empty to
// begin with is that the Stage USED TO have promotion mechanisms, but they
// were removed, thus becoming a control flow Stage.
status.CurrentFreight = nil

// For now all available Freight (qualified upstream) should automatically and
// immediately be qualified for this Stage, making it available downstream. In
// the future, we may have more options before qualifying them (e.g. require
// that they were qualified in all our upstreams)
var availableFreight []kargoapi.Freight
var err error
if stage.Spec.Subscriptions.Warehouse != "" {
if availableFreight, err = r.getAllFreightFromWarehouseFn(
ctx,
stage.Namespace,
stage.Spec.Subscriptions.Warehouse,
); err != nil {
return status, errors.Wrapf(
err,
"error finding all Freight from Warehouse %q in namespace %q",
stage.Spec.Subscriptions.Warehouse,
stage.Namespace,
)
}
} else {
if availableFreight, err = r.getAllFreightQualifiedForUpstreamStagesFn(
ctx,
stage.Namespace,
stage.Spec.Subscriptions.UpstreamStages,
); err != nil {
return status, errors.Wrapf(
err,
"error finding available Freight for Stage %q in namespace %q",
stage.Name,
stage.Namespace,
)
}
}
for _, available := range availableFreight {
af := available // Avoid implicit memory aliasing
// Only bother to qualify if not already qualified
if _, qualified := af.Status.Qualifications[stage.Name]; !qualified {
newStatus := *af.Status.DeepCopy()
if newStatus.Qualifications == nil {
newStatus.Qualifications = map[string]kargoapi.Qualification{}
}
newStatus.Qualifications[stage.Name] = kargoapi.Qualification{}
if err = r.patchFreightStatusFn(ctx, &af, newStatus); err != nil {
return status, errors.Wrapf(
err,
"error qualifying Freight %q in namespace %q for Stage %q",
af.ID,
stage.Namespace,
stage.Name,
)
}
}
}
return status, nil
}

func (r *reconciler) syncNormalStage(
ctx context.Context,
stage *kargoapi.Stage,
) (kargoapi.StageStatus, error) {
Expand Down Expand Up @@ -345,14 +434,6 @@ func (r *reconciler) syncStage(
status.Health = nil // Reset health
status.CurrentPromotion = nil

if stage.Spec.PromotionMechanisms == nil {
// A Stage without promotion mechanisms shouldn't have a currentFreight.
// Make sure this is empty to avoid confusion. A reason it could be
// non-empty to begin with is that the Stage USED TO have promotion
// mechanisms, but they were removed.
status.CurrentFreight = nil
}

if status.CurrentFreight == nil {
logger.Debug("Stage has no current Freight; no health checks to perform")
} else { // Check health and qualify current Freight if applicable
Expand Down Expand Up @@ -664,11 +745,11 @@ func (r *reconciler) getLatestAvailableFreight(
return latestFreight, nil
}

func (r *reconciler) getLatestFreightFromWarehouse(
func (r *reconciler) getAllFreightFromWarehouse(
ctx context.Context,
namespace string,
warehouse string,
) (*kargoapi.Freight, error) {
) ([]kargoapi.Freight, error) {
var freight kargoapi.FreightList
if err := r.listFreightFn(
ctx,
Expand Down Expand Up @@ -696,14 +777,29 @@ func (r *reconciler) getLatestFreightFromWarehouse(
return freight.Items[j].CreationTimestamp.
Before(&freight.Items[i].CreationTimestamp)
})
return &freight.Items[0], nil
return freight.Items, nil
}

func (r *reconciler) getLatestFreightQualifiedForUpstreamStages(
func (r *reconciler) getLatestFreightFromWarehouse(
ctx context.Context,
namespace string,
stageSubs []kargoapi.StageSubscription,
warehouse string,
) (*kargoapi.Freight, error) {
freight, err := r.getAllFreightFromWarehouseFn(ctx, namespace, warehouse)
if err != nil {
return nil, err
}
if len(freight) == 0 {
return nil, nil
}
return &freight[0], nil
}

func (r *reconciler) getAllFreightQualifiedForUpstreamStages(
ctx context.Context,
namespace string,
stageSubs []kargoapi.StageSubscription,
) ([]kargoapi.Freight, error) {
// Start by building a de-duped map of Freight qualified for ANY upstream
// Stage
qualifiedFreight := map[string]kargoapi.Freight{}
Expand Down Expand Up @@ -746,5 +842,21 @@ func (r *reconciler) getLatestFreightQualifiedForUpstreamStages(
return qualifiedFreightList[j].CreationTimestamp.
Before(&qualifiedFreightList[i].CreationTimestamp)
})
return &qualifiedFreightList[0], nil
return qualifiedFreightList, nil
}

func (r *reconciler) getLatestFreightQualifiedForUpstreamStages(
ctx context.Context,
namespace string,
stageSubs []kargoapi.StageSubscription,
) (*kargoapi.Freight, error) {
qualifiedFreight, err :=
r.getAllFreightQualifiedForUpstreamStagesFn(ctx, namespace, stageSubs)
if err != nil {
return nil, err
}
if len(qualifiedFreight) == 0 {
return nil, nil
}
return &qualifiedFreight[0], nil
}
Loading