Skip to content

Commit

Permalink
Add controllerrevision for workspaceReconcile
Browse files Browse the repository at this point in the history
Signed-off-by: Bangqi Zhu <bangqizhu@microsoft.com>
  • Loading branch information
Bangqi Zhu committed Jul 18, 2024
1 parent 0cbb06f commit 2ecafd0
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 10 deletions.
3 changes: 3 additions & 0 deletions charts/kaito/workspace/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ rules:
- apiGroups: [ "apps" ]
resources: ["deployments" ]
verbs: ["get","list","watch","create", "delete","update", "patch"]
- apiGroups: [ "apps" ]
resources: ["controllerrevisions" ]
verbs: [ "get","list","watch","create", "delete","update", "patch"]
- apiGroups: [ "apps" ]
resources: [ "statefulsets" ]
verbs: [ "get","list","watch","create", "delete","update", "patch" ]
Expand Down
12 changes: 6 additions & 6 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/signals"
"knative.dev/pkg/webhook"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
Expand Down Expand Up @@ -53,6 +54,7 @@ var (
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))

//utilruntime.Must(v1.AddToScheme(scheme))
utilruntime.Must(kaitov1alpha1.AddToScheme(scheme))
utilruntime.Must(v1alpha5.SchemeBuilder.AddToScheme(scheme))
utilruntime.Must(v1beta1.SchemeBuilder.AddToScheme(scheme))
Expand Down Expand Up @@ -109,12 +111,10 @@ func main() {

k8sclient.SetGlobalClient(mgr.GetClient())

if err = (&controllers.WorkspaceReconciler{
Client: k8sclient.GetGlobalClient(),
Log: log.Log.WithName("controllers").WithName("Workspace"),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("KAITO-Workspace-controller"),
}).SetupWithManager(mgr); err != nil {
workspaceReconciler := controllers.NewWorkspaceReconciler(k8sclient.GetGlobalClient(),
mgr.GetScheme(), log.Log.WithName("controllers").WithName("Workspace"), mgr.GetEventRecorderFor("KAITO-Workspace-controller"))

if err = workspaceReconciler.SetupWithManager(mgr); err != nil {
klog.ErrorS(err, "unable to create controller", "controller", "Workspace")
exitWithErrorFunc()
}
Expand Down
93 changes: 89 additions & 4 deletions pkg/controllers/workspace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package controllers

import (
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"sort"
"strings"
Expand Down Expand Up @@ -46,13 +49,26 @@ import (
const (
gpuSkuPrefix = "Standard_N"
nodePluginInstallTimeout = 60 * time.Second
WorkspaceRevisionLabel = "workspace.kaito.io/revision"
WorkspaceNameLabel = "workspace.kaito.io/name"
)

type WorkspaceReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
revisionCache []string
}

func NewWorkspaceReconciler(client client.Client, scheme *runtime.Scheme, log logr.Logger, Recorder record.EventRecorder) *WorkspaceReconciler {
return &WorkspaceReconciler{
Client: client,
Scheme: scheme,
Log: log,
Recorder: Recorder,
revisionCache: []string{},
}
}

func (c *WorkspaceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
Expand All @@ -66,6 +82,10 @@ func (c *WorkspaceReconciler) Reconcile(ctx context.Context, req reconcile.Reque

klog.InfoS("Reconciling", "workspace", req.NamespacedName)

if err := c.updateControllerRevision(ctx, workspaceObj); err != nil {
return reconcile.Result{}, err
}

// Handle deleting workspace, garbage collect all the resources.
if !workspaceObj.DeletionTimestamp.IsZero() {
return c.deleteWorkspace(ctx, workspaceObj)
Expand All @@ -89,7 +109,17 @@ func (c *WorkspaceReconciler) Reconcile(ctx context.Context, req reconcile.Reque
}
}

return c.addOrUpdateWorkspace(ctx, workspaceObj)
result, err := c.addOrUpdateWorkspace(ctx, workspaceObj)
if err != nil {
return result, err
}

if err := c.updateControllerRevision(ctx, workspaceObj); err != nil {
klog.ErrorS(err, "failed to update ControllerRevision", "workspace", klog.KObj(workspaceObj))
return reconcile.Result{}, err
}

return result, nil
}

func (c *WorkspaceReconciler) addOrUpdateWorkspace(ctx context.Context, wObj *kaitov1alpha1.Workspace) (reconcile.Result, error) {
Expand Down Expand Up @@ -152,6 +182,60 @@ func (c *WorkspaceReconciler) deleteWorkspace(ctx context.Context, wObj *kaitov1

return c.garbageCollectWorkspace(ctx, wObj)
}
func (c *WorkspaceReconciler) updateControllerRevision(ctx context.Context, wObj *kaitov1alpha1.Workspace) error {
needNewRevision := false
var currentHash string
if len(c.revisionCache) == 0 {
needNewRevision = true
} else {
currentHash = computeHash(wObj)
lastCachedHash := c.revisionCache[len(c.revisionCache)-1]
if currentHash != lastCachedHash {
needNewRevision = true
}
}
if !needNewRevision {
return nil
}
if currentHash == "" {
currentHash = computeHash(wObj)
}
data := map[string]string{"hash": currentHash}
jsonData, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to marshal revision data: %w", err)
}
newRevision := &appsv1.ControllerRevision{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", wObj.Name, currentHash[:8]),
Namespace: wObj.Namespace,
Labels: map[string]string{
WorkspaceRevisionLabel: "true",
WorkspaceNameLabel: wObj.Name,
},
},
Revision: int64(len(c.revisionCache) + 1),
Data: runtime.RawExtension{Raw: jsonData},
}
if err := c.Client.Create(ctx, newRevision); err != nil {
return fmt.Errorf("failed to create new ControllerRevision: %w", err)
}
c.revisionCache = append(c.revisionCache, currentHash)
const maxRevisionHistoryLimit = 10
if len(c.revisionCache) > maxRevisionHistoryLimit {
c.revisionCache = c.revisionCache[1:]
}
return nil
}

func computeHash(w *kaitov1alpha1.Workspace) string {
hasher := md5.New()
encoder := json.NewEncoder(hasher)
encoder.Encode(w.Resource)
encoder.Encode(w.Inference)
encoder.Encode(w.Tuning)
return hex.EncodeToString(hasher.Sum(nil))
}

func (c *WorkspaceReconciler) selectWorkspaceNodes(qualified []*corev1.Node, preferred []string, previous []string, count int) []*corev1.Node {

Expand Down Expand Up @@ -633,6 +717,7 @@ func (c *WorkspaceReconciler) SetupWithManager(mgr ctrl.Manager) error {

builder := ctrl.NewControllerManagedBy(mgr).
For(&kaitov1alpha1.Workspace{}).
Owns(&appsv1.ControllerRevision{}).
Owns(&appsv1.Deployment{}).
Owns(&appsv1.StatefulSet{}).
Watches(&v1alpha5.Machine{}, c.watchMachines()).
Expand Down

0 comments on commit 2ecafd0

Please sign in to comment.