Skip to content

Commit

Permalink
feat(gc): garbage collect old freight (akuity#1600)
Browse files Browse the repository at this point in the history
Signed-off-by: Kent Rancourt <kent.rancourt@gmail.com>
  • Loading branch information
krancour authored Mar 13, 2024
1 parent f6fd5d1 commit 9f09705
Show file tree
Hide file tree
Showing 16 changed files with 1,082 additions and 350 deletions.
24 changes: 13 additions & 11 deletions charts/kargo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,16 @@ the Kargo controller is running.

### Garbage Collector

| Name | Description | Value |
| ---------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------- |
| `garbageCollector.enabled` | Whether the garbage collector is enabled. | `true` |
| `garbageCollector.schedule` | When to run the garbage collector. | `0 * * * *` |
| `garbageCollector.workers` | The number of concurrent workers to run. Tuning this too low will result in slow garbage collection. Tuning this too high will result in too many API calls and may result in throttling. | `3` |
| `garbageCollector.maxRetainedPromotions` | The maximum number of Promotions in terminal phases PER PROJECT that may be spared by the garbage collector. | `20` |
| `garbageCollector.logLevel` | The log level for the garbage collector. | `INFO` |
| `garbageCollector.resources` | Resources limits and requests for the garbage collector containers. | `{}` |
| `garbageCollector.nodeSelector` | Node selector for the garbage collector pods. | `{}` |
| `garbageCollector.tolerations` | Tolerations for the garbage collector pods. | `[]` |
| `garbageCollector.affinity` | Specifies pod affinity for the garbage collector pods. | `{}` |
| Name | Description | Value |
| ---------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ----------- |
| `garbageCollector.enabled` | Whether the garbage collector is enabled. | `true` |
| `garbageCollector.schedule` | When to run the garbage collector. | `0 * * * *` |
| `garbageCollector.workers` | The number of concurrent workers to run. Tuning this too low will result in slow garbage collection. Tuning this too high will result in too many API calls and may result in throttling. | `3` |
| `garbageCollector.maxRetainedPromotions` | The maximum number of Promotions in terminal phases PER PROJECT that may be spared by the garbage collector. | `20` |
| `garbageCollector.maxRetainedFreight` | The ideal maximum number of Freight OLDER than the oldest still in use (from each Warehouse) that may be spared by the garbage collector. The ACTUAL number of older Freight spared may exceed this ideal if some Freight that would otherwise be deleted do not meet the minimum age criterion. | `20` |
| `garbageCollector.minFreightDeletionAge` | The minimum age Freight must be before considered eligible for garbage collection. | `336h` |
| `garbageCollector.logLevel` | The log level for the garbage collector. | `INFO` |
| `garbageCollector.resources` | Resources limits and requests for the garbage collector containers. | `{}` |
| `garbageCollector.nodeSelector` | Node selector for the garbage collector pods. | `{}` |
| `garbageCollector.tolerations` | Tolerations for the garbage collector pods. | `[]` |
| `garbageCollector.affinity` | Specifies pod affinity for the garbage collector pods. | `{}` |
10 changes: 10 additions & 0 deletions charts/kargo/templates/garbage-collector/cluster-roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ rules:
- apiGroups:
- kargo.akuity.io
resources:
- stages
- warehouses
verbs:
- get
- list
- watch
- apiGroups:
- kargo.akuity.io
resources:
- freights
- promotions
verbs:
- delete
Expand Down
2 changes: 2 additions & 0 deletions charts/kargo/templates/garbage-collector/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ data:
LOG_LEVEL: {{ quote .Values.garbageCollector.logLevel }}
NUM_WORKERS: {{ quote .Values.garbageCollector.workers }}
MAX_RETAINED_PROMOTIONS: {{ quote .Values.garbageCollector.maxRetainedPromotions }}
MAX_RETAINED_FREIGHT: {{ quote .Values.garbageCollector.maxRetainedFreight }}
MIN_FREIGHT_DELETION_AGE: {{ quote .Values.garbageCollector.minFreightDeletionAge }}
{{- end }}
4 changes: 4 additions & 0 deletions charts/kargo/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,10 @@ garbageCollector:
workers: 3
## @param garbageCollector.maxRetainedPromotions The maximum number of Promotions in terminal phases PER PROJECT that may be spared by the garbage collector.
maxRetainedPromotions: 20
## @param garbageCollector.maxRetainedFreight The ideal maximum number of Freight OLDER than the oldest still in use (from each Warehouse) that may be spared by the garbage collector. The ACTUAL number of older Freight spared may exceed this ideal if some Freight that would otherwise be deleted do not meet the minimum age criterion.
maxRetainedFreight: 20
## @param garbageCollector.minFreightDeletionAge The minimum age Freight must be before considered eligible for garbage collection.
minFreightDeletionAge: 336h # Two weeks
## @param garbageCollector.logLevel The log level for the garbage collector.
logLevel: INFO
## @param garbageCollector.resources Resources limits and requests for the garbage collector containers.
Expand Down
41 changes: 35 additions & 6 deletions cmd/controlplane/garbage_collector.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package main

import (
"context"
"errors"
"fmt"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"

kargoapi "github.com/akuity/kargo/api/v1alpha1"
"github.com/akuity/kargo/internal/api/kubernetes"
"github.com/akuity/kargo/internal/garbage"
"github.com/akuity/kargo/internal/kubeclient"
"github.com/akuity/kargo/internal/os"
versionpkg "github.com/akuity/kargo/internal/version"
)
Expand All @@ -33,7 +38,7 @@ func newGarbageCollectorCommand() *cobra.Command {

cfg := garbage.CollectorConfigFromEnv()

var kubeClient client.Client
var mgr manager.Manager
{
restCfg, err :=
kubernetes.GetRestConfig(ctx, os.GetEnv("KUBECONFIG", ""))
Expand All @@ -47,17 +52,41 @@ func newGarbageCollectorCommand() *cobra.Command {
if err = kargoapi.AddToScheme(scheme); err != nil {
return fmt.Errorf("error adding Kargo API to scheme: %w", err)
}
if kubeClient, err = client.New(
if mgr, err = ctrl.NewManager(
restCfg,
client.Options{
ctrl.Options{
Scheme: scheme,
Metrics: server.Options{
BindAddress: "0",
},
},
); err != nil {
return fmt.Errorf("err: %w", err)
return fmt.Errorf("error initializing controller manager: %w", err)
}
// Index Freight by Warehouse
if err = kubeclient.IndexFreightByWarehouse(ctx, mgr); err != nil {
return fmt.Errorf("error indexing Freight by Warehouse: %w", err)
}
// Index Stages by Freight
if err = kubeclient.IndexStagesByFreight(ctx, mgr); err != nil {
return fmt.Errorf("error indexing Stages by Freight: %w", err)
}
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

go func() {
if err := mgr.Start(ctx); err != nil {
panic(fmt.Errorf("start manager: %w", err))
}
}()

if !mgr.GetCache().WaitForCacheSync(ctx) {
return errors.New("error waiting for cache sync")
}

return garbage.NewCollector(kubeClient, cfg).Run(ctx)
return garbage.NewCollector(mgr.GetClient(), cfg).Run(ctx)
},
}
}
2 changes: 2 additions & 0 deletions hack/tilt/values.dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ managementController:
logLevel: DEBUG
webhooksServer:
logLevel: DEBUG
garbageCollector:
logLevel: DEBUG
143 changes: 51 additions & 92 deletions internal/garbage/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@ import (
"errors"
"fmt"
"math"
"sort"
"sync"
"time"

"github.com/kelseyhightower/envconfig"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"

kargoapi "github.com/akuity/kargo/api/v1alpha1"
"github.com/akuity/kargo/internal/logging"
)

// CollectorConfig is configuration for the garbage collector.
Expand All @@ -27,6 +26,15 @@ type CollectorConfig struct {
// MaxRetainedPromotions specifies the maximum number of Promotions in
// terminal phases per Project that may be spared by the garbage collector.
MaxRetainedPromotions int `envconfig:"MAX_RETAINED_PROMOTIONS" default:"20"`
// MaxRetainedFreight specifies the ideal maximum number of Freight OLDER than
// the oldest still in use (from each Warehouse) that may be spared by the
// garbage collector. The ACTUAL number of older Freight spared may exceed
// this ideal if some Freight that would otherwise be deleted do not meet the
// minimum age criterion.
MaxRetainedFreight int `envconfig:"MAX_RETAINED_FREIGHT" default:"20"`
// MinFreightDeletionAge specifies the minimum age Freight must be before
// considered eligible for garbage collection.
MinFreightDeletionAge time.Duration `envconfig:"MIN_FREIGHT_DELETION_AGE" default:"336h"` // 2 weeks
}

// CollectorConfigFromEnv returns a CollectorConfig populated from environment
Expand Down Expand Up @@ -60,6 +68,8 @@ type collector struct {
project string,
) error

cleanProjectPromotionsFn func(context.Context, string) error

listProjectsFn func(
context.Context,
client.ObjectList,
Expand All @@ -77,6 +87,38 @@ type collector struct {
client.Object,
...client.DeleteOption,
) error

cleanProjectFreightFn func(context.Context, string) error

listWarehousesFn func(
context.Context,
client.ObjectList,
...client.ListOption,
) error

cleanWarehouseFreightFn func(
ctx context.Context,
project string,
warehouse string,
) error

listFreightFn func(
context.Context,
client.ObjectList,
...client.ListOption,
) error

listStagesFn func(
context.Context,
client.ObjectList,
...client.ListOption,
) error

deleteFreightFn func(
context.Context,
client.Object,
...client.DeleteOption,
) error
}

// NewCollector initializes and returns an implementation of the Collector
Expand All @@ -87,9 +129,16 @@ func NewCollector(kubeClient client.Client, cfg CollectorConfig) Collector {
}
c.cleanProjectsFn = c.cleanProjects
c.cleanProjectFn = c.cleanProject
c.cleanProjectPromotionsFn = c.cleanProjectPromotions
c.listProjectsFn = kubeClient.List
c.listPromotionsFn = kubeClient.List
c.deletePromotionFn = kubeClient.Delete
c.cleanProjectFreightFn = c.cleanProjectFreight
c.listWarehousesFn = kubeClient.List
c.cleanWarehouseFreightFn = c.cleanWarehouseFreight
c.listFreightFn = kubeClient.List
c.listStagesFn = kubeClient.List
c.deleteFreightFn = kubeClient.Delete
return c
}

Expand Down Expand Up @@ -172,93 +221,3 @@ func (c *collector) Run(ctx context.Context) error {

return nil
}

// cleanProjects is a worker function that receives Project names over a channel
// until that channel is closed. It will execute garbage collection for each
// Project name received.
func (c *collector) cleanProjects(
ctx context.Context,
projectCh <-chan string,
errCh chan<- struct{},
) {
for {
select {
case project, ok := <-projectCh:
if !ok {
return // Channel was closed
}
if err := c.cleanProjectFn(ctx, project); err != nil {
select {
case errCh <- struct{}{}:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}

// cleanProject executes garbage collection for a single Project.
func (c *collector) cleanProject(ctx context.Context, project string) error {
logger := logging.LoggerFromContext(ctx).WithField("project", project)

promos := kargoapi.PromotionList{}
if err := c.listPromotionsFn(
ctx,
&promos,
client.InNamespace(project),
); err != nil {
return fmt.Errorf("error listing Promotions for Project %q: %w", project, err)
}

if len(promos.Items) <= c.cfg.MaxRetainedPromotions {
return nil // Done
}

// Sort Promotions by creation time
sort.Sort(byCreation(promos.Items))

// Delete oldest Promotions (in terminal phases only) that are in excess of
// MaxRetainedPromotions
var deleteErrCount int
for i := c.cfg.MaxRetainedPromotions; i < len(promos.Items); i++ {
promo := promos.Items[i]
if promo.Status.Phase.IsTerminal() {
promoLogger := logger.WithField("promotion", promo.Name)
if err := c.deletePromotionFn(ctx, &promo); err != nil {
promoLogger.Errorf("error deleting Promotion: %s", err)
deleteErrCount++
} else {
promoLogger.Debug("deleted Promotion")
}
}
}

if deleteErrCount > 0 {
return fmt.Errorf(
"error deleting one or more Promotions from Project %q",
project,
)
}

return nil
}

// byCreation implements sort.Interface for []kargoapi.Promotion.
type byCreation []kargoapi.Promotion

func (b byCreation) Len() int {
return len(b)
}

func (b byCreation) Swap(i, j int) {
b[i], b[j] = b[j], b[i]
}

func (b byCreation) Less(i, j int) bool {
return b[i].ObjectMeta.CreationTimestamp.Time.After(
b[j].ObjectMeta.CreationTimestamp.Time,
)
}
Loading

0 comments on commit 9f09705

Please sign in to comment.