Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scale buildkit to zero #40

Merged
merged 7 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var cfg struct {
BuildKitAutoDiscoveryKubernetesPodSelector string
BuildKitAutoDiscoveryKubernetesNamespace string
BuildKitAutoDiscoveryKubernetesLeasePrefix string
BuildKitAutoDiscoveryScaleStatefulset string
wpjunior marked this conversation as resolved.
Show resolved Hide resolved
KubernetesConfig string
BuildKitAutoDiscoveryTimeout time.Duration
BuildKitAutoDiscoveryKubernetesPort int
Expand Down Expand Up @@ -71,6 +72,7 @@ func main() {
flag.IntVar(&cfg.BuildKitAutoDiscoveryKubernetesPort, "buildkit-autodiscovery-kubernetes-port", 80, "TCP port number which BuldKit's service is listening")
flag.BoolVar(&cfg.BuildKitAutoDiscoveryKubernetesSetTsuruAppLabels, "buildkit-autodiscovery-kubernetes-set-tsuru-app-labels", false, "Whether should set the Tsuru app labels in the selected BuildKit pod")
flag.BoolVar(&cfg.BuildKitAutoDiscoveryKubernetesUseSameNamespaceAsTsuruApp, "buildkit-autodiscovery-kubernetes-use-same-namespace-as-tsuru-app", false, "Whether should look for BuildKit in the Tsuru app's namespace")
flag.StringVar(&cfg.BuildKitAutoDiscoveryScaleStatefulset, "buildkit-autodiscovery-scale-statefulset", "", "Name of statefulset of buildkit that scale from zero")

flag.Parse()

Expand Down Expand Up @@ -170,6 +172,7 @@ func newBuildKit() (*buildkit.BuildKit, error) {
SetTsuruAppLabel: cfg.BuildKitAutoDiscoveryKubernetesSetTsuruAppLabels,
UseSameNamespaceAsApp: cfg.BuildKitAutoDiscoveryKubernetesUseSameNamespaceAsTsuruApp,
LeasePrefix: cfg.BuildKitAutoDiscoveryKubernetesLeasePrefix,
ScaleStatefulset: cfg.BuildKitAutoDiscoveryScaleStatefulset,
}

return b.WithKubernetesDiscovery(cs, dcs, kdopts), nil
Expand Down
13 changes: 10 additions & 3 deletions pkg/build/buildkit/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/client-go/kubernetes"

"github.com/tsuru/deploy-agent/pkg/build"
"github.com/tsuru/deploy-agent/pkg/build/gc"
pb "github.com/tsuru/deploy-agent/pkg/build/grpc_build_v1"
"github.com/tsuru/deploy-agent/pkg/util"
)
Expand Down Expand Up @@ -66,6 +67,7 @@ type KubernertesDiscoveryOptions struct {
PodSelector string
Namespace string
LeasePrefix string
ScaleStatefulset string
Port int
UseSameNamespaceAsApp bool
SetTsuruAppLabel bool
Expand All @@ -76,6 +78,11 @@ func (b *BuildKit) WithKubernetesDiscovery(cs *kubernetes.Clientset, dcs dynamic
b.k8s = cs
b.dk8s = dcs
b.kdopts = &opts

if opts.ScaleStatefulset != "" {
gc.Run(cs, opts.PodSelector, opts.ScaleStatefulset)
}

return b
}

Expand All @@ -100,7 +107,7 @@ func (b *BuildKit) Build(ctx context.Context, r *pb.BuildRequest, w io.Writer) (
return nil, errors.New("writer must implement console.File")
}

c, clientCleanUp, err := b.client(ctx, r)
c, clientCleanUp, err := b.client(ctx, r, w)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -539,15 +546,15 @@ func callBuildKitToExtractTsuruConfigs(ctx context.Context, c *client.Client, lo
return tc, nil
}

func (b *BuildKit) client(ctx context.Context, req *pb.BuildRequest) (*client.Client, func(), error) {
func (b *BuildKit) client(ctx context.Context, req *pb.BuildRequest, w io.Writer) (*client.Client, func(), error) {
isBuildForApp := strings.HasPrefix(pb.BuildKind_name[int32(req.Kind)], "BUILD_KIND_APP_")

if isBuildForApp && b.opts.DiscoverBuildKitClientForApp {
d := &k8sDiscoverer{
cs: b.k8s,
dcs: b.dk8s,
}
return d.Discover(ctx, *b.kdopts, req)
return d.Discover(ctx, *b.kdopts, req, w)
}

return b.cli, noopFunc, nil
Expand Down
35 changes: 27 additions & 8 deletions pkg/build/buildkit/k8s_autodiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"context"
"encoding/json"
"fmt"
"io"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -52,15 +53,15 @@
dcs dynamic.Interface
}

func (d *k8sDiscoverer) Discover(ctx context.Context, opts KubernertesDiscoveryOptions, req *pb.BuildRequest) (*client.Client, func(), error) {
func (d *k8sDiscoverer) Discover(ctx context.Context, opts KubernertesDiscoveryOptions, req *pb.BuildRequest, w io.Writer) (*client.Client, func(), error) {
if req.App == nil {
return nil, noopFunc, fmt.Errorf("there's only support for discovering BuildKit pods from Tsuru apps")
}

return d.discoverBuildKitClientFromApp(ctx, opts, req.App.Name)
return d.discoverBuildKitClientFromApp(ctx, opts, req.App.Name, w)
}

func (d *k8sDiscoverer) discoverBuildKitClientFromApp(ctx context.Context, opts KubernertesDiscoveryOptions, app string) (*client.Client, func(), error) {
func (d *k8sDiscoverer) discoverBuildKitClientFromApp(ctx context.Context, opts KubernertesDiscoveryOptions, app string, w io.Writer) (*client.Client, func(), error) {
leaderCtx, leaderCancel := context.WithCancel(ctx)
cfns := []func(){
func() {
Expand All @@ -69,7 +70,7 @@
},
}

pod, err := d.discoverBuildKitPod(leaderCtx, opts, app)
pod, err := d.discoverBuildKitPod(leaderCtx, opts, app, w)
if err != nil {
return nil, cleanUps(cfns...), err
}
Expand All @@ -82,7 +83,7 @@
return nil, cleanUps(cfns...), fmt.Errorf("failed to set Tsuru app labels on BuildKit's pod: %w", err)
}

cfns = append(cfns, func() {

Check failure on line 86 in pkg/build/buildkit/k8s_autodiscovery.go

View workflow job for this annotation

GitHub Actions / lint

Function `discoverBuildKitClientFromApp$2` should pass the context parameter (contextcheck)
klog.V(4).Infoln("Removing Tsuru app labels in the pod", pod.Name)
nerr := unsetTsuruAppLabelOnBuildKitPod(context.Background(), d.cs, pod.Name, pod.Namespace)
if nerr != nil {
Expand All @@ -108,7 +109,7 @@
return c, cleanUps(cfns...), nil
}

func (d *k8sDiscoverer) discoverBuildKitPod(ctx context.Context, opts KubernertesDiscoveryOptions, app string) (*corev1.Pod, error) {
func (d *k8sDiscoverer) discoverBuildKitPod(ctx context.Context, opts KubernertesDiscoveryOptions, app string, w io.Writer) (*corev1.Pod, error) {
deadlineCtx, deadlineCancel := context.WithCancel(ctx)
defer deadlineCancel()

Expand All @@ -127,7 +128,7 @@
defer watchCancel() // watch cancellation must happen before than closing the pods channel

go func() {
nerr := watchBuildKitPods(watchCtx, d.cs, opts.PodSelector, ns, pods)
nerr := watchBuildKitPods(watchCtx, d.cs, opts, ns, pods, w)
if nerr != nil {
errCh <- nerr
}
Expand Down Expand Up @@ -208,9 +209,27 @@
return ns, nil
}

func watchBuildKitPods(ctx context.Context, cs *kubernetes.Clientset, labelSelector, ns string, pods chan<- *corev1.Pod) error {
func watchBuildKitPods(ctx context.Context, cs *kubernetes.Clientset, opts KubernertesDiscoveryOptions, ns string, pods chan<- *corev1.Pod, writer io.Writer) error {
if opts.ScaleStatefulset != "" {
stfullset, err := cs.AppsV1().StatefulSets(ns).Get(ctx, opts.ScaleStatefulset, metav1.GetOptions{})
if err != nil {
return err
}

if stfullset.Spec.Replicas == nil || *stfullset.Spec.Replicas == 0 {
fmt.Fprintln(writer, "There is no buildkits available, scaling to one replica")
wantedReplicas := int32(1)
stfullset.Spec.Replicas = &wantedReplicas

_, err := cs.AppsV1().StatefulSets(ns).Update(ctx, stfullset, metav1.UpdateOptions{})
if err != nil {
return err
}
}
}

w, err := cs.CoreV1().Pods(ns).Watch(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
LabelSelector: opts.PodSelector,
Watch: true,
})
if err != nil {
Expand Down
94 changes: 94 additions & 0 deletions pkg/build/gc/gc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package gc

Check failure on line 1 in pkg/build/gc/gc.go

View workflow job for this annotation

GitHub Actions / lint

Missed header for check (goheader)

import (
"context"
"fmt"
"strconv"
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"
)

func Run(clientset *kubernetes.Clientset, podSelector, buildkitStefulset string) {
ctx := context.Background()

go func() {
for {
TickGC(ctx, clientset, podSelector, buildkitStefulset)

Check failure on line 19 in pkg/build/gc/gc.go

View workflow job for this annotation

GitHub Actions / lint

Error return value is not checked (errcheck)
time.Sleep(time.Minute * 5)
}
}()
}

const DeployAgentLastBuildEndingTimeLabelKey = "deploy-agent.tsuru.io/last-build-ending-time" // TODO: move to other place

func TickGC(ctx context.Context, clientset *kubernetes.Clientset, podSelector, buildkitStefulset string) error {
defer func() {
recoverErr := recover()
fmt.Println("print err", recoverErr)
}()

buildKitPods, err := clientset.CoreV1().Pods("*").List(ctx, v1.ListOptions{
LabelSelector: podSelector,
})

if err != nil {
return err
}

maxEndtimeByNS := map[string]int64{}

for _, pod := range buildKitPods.Items {
if pod.Annotations[DeployAgentLastBuildEndingTimeLabelKey] == "" {
maxEndtimeByNS[pod.Namespace] = -1 // mark that namespace has least one pod of buildkit running
continue
}

maxUsage, err := strconv.ParseInt(pod.Annotations[DeployAgentLastBuildEndingTimeLabelKey], 10, 64)
if err != nil {
klog.Errorf("failed to parseint: %s", err.Error())
continue
}

if maxEndtimeByNS[pod.Namespace] == -1 {
continue
}

if maxEndtimeByNS[pod.Namespace] < maxUsage {
maxEndtimeByNS[pod.Namespace] = maxUsage
}
}

now := time.Now().Unix()
gracefulPeriod := int64(60 * 30)
zero := int32(0)

for ns, maxEndtime := range maxEndtimeByNS {
if maxEndtime == -1 {
continue
}

if now-maxEndtime < gracefulPeriod {
continue
}

statefulset, err := clientset.AppsV1().StatefulSets(ns).Get(ctx, buildkitStefulset, v1.GetOptions{})

if err != nil {
klog.Errorf("failed to get statefullsets from ns: %s, err: %s", ns, err.Error())
continue
}

statefulset.Spec.Replicas = &zero
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: @gvicentin we may save the current replicas into a annotation


_, err = clientset.AppsV1().StatefulSets(ns).Update(ctx, statefulset, v1.UpdateOptions{})
if err != nil {
klog.Errorf("failed to update statefullsets from ns: %s, err: %s", ns, err.Error())
continue
}
}

return nil
}
Loading