Skip to content

Commit

Permalink
Update upgrade reconciler to handle subclusters in sandbox (#772)
Browse files Browse the repository at this point in the history
This change will modify the upgrade reconcilers, both offline and
online, to function in either the main cluster or a sandbox. Our
immediate plan is to use the offline upgrade reconciler within the
sandbox controller, although this will be done in a follow-on task.
  • Loading branch information
roypaulin authored and cchen-vertica committed Jul 17, 2024
1 parent bb67fb1 commit 688289a
Show file tree
Hide file tree
Showing 17 changed files with 282 additions and 132 deletions.
2 changes: 1 addition & 1 deletion pkg/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ func BuildPod(vdb *vapi.VerticaDB, sc *vapi.Subcluster, podIndex int32) *corev1.
ObjectMeta: metav1.ObjectMeta{
Name: nm.Name,
Namespace: nm.Namespace,
Labels: MakeLabelsForPodObject(vdb, sc),
Labels: MakeLabelsForSandboxPodObject(vdb, sc),
Annotations: MakeAnnotationsForObject(vdb),
},
Spec: buildPodSpec(vdb, sc),
Expand Down
11 changes: 11 additions & 0 deletions pkg/builder/labels_annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ func MakeLabelsForPodObject(vdb *vapi.VerticaDB, sc *vapi.Subcluster) map[string
return makeLabelsForObject(vdb, sc, true)
}

// MakeLabelsForSandboxPodObject constructs the labels that are common for all pods plus
// the sandbox name label. It is for testing purposes.
func MakeLabelsForSandboxPodObject(vdb *vapi.VerticaDB, sc *vapi.Subcluster) map[string]string {
labels := makeLabelsForObject(vdb, sc, true)
sandbox := vdb.GetSubclusterSandboxName(sc.Name)
if sandbox != vapi.MainCluster {
labels[vmeta.SandboxNameLabel] = sandbox
}
return labels
}

// MakeLabelsForStsObject constructs the labels that are common for all statefulsets.
func MakeLabelsForStsObject(vdb *vapi.VerticaDB, sc *vapi.Subcluster) map[string]string {
labels := makeLabelsForObject(vdb, sc, false)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/vdb/crashloop_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (c *CrashLoopReconciler) Reconcile(ctx context.Context, _ *ctrl.Request) (c

func (c *CrashLoopReconciler) reconcileStatefulSets(ctx context.Context) {
finder := iter.MakeSubclusterFinder(c.VRec.Client, c.VDB)
stss, err := finder.FindStatefulSets(ctx, iter.FindExisting|iter.FindSorted)
stss, err := finder.FindStatefulSets(ctx, iter.FindExisting|iter.FindSorted, vapi.MainCluster)
if err != nil {
// This reconciler is a best effort. It only tries to surface meaningful
// error messages based on the events it see. For this reason, no errors
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/vdb/dbremovesubcluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (d *DBRemoveSubclusterReconciler) resetDefaultSubcluster(ctx context.Contex
scFinder := iter.MakeSubclusterFinder(d.VRec.Client, d.Vdb)
// We use the FindServices() API to get subclusters that already exist.
// We can only change the default subcluster to one of those.
svcs, err := scFinder.FindServices(ctx, iter.FindInVdb)
svcs, err := scFinder.FindServices(ctx, iter.FindInVdb, vapi.MainCluster)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/vdb/obj_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,9 @@ func (o *ObjReconciler) checkForDeletedSubcluster(ctx context.Context) (ctrl.Res

finder := iter.MakeSubclusterFinder(o.VRec.Client, o.Vdb)

sandbox := o.PFacts.GetSandboxName()
// Find any statefulsets that need to be deleted
stss, err := finder.FindStatefulSets(ctx, iter.FindNotInVdb)
stss, err := finder.FindStatefulSets(ctx, iter.FindNotInVdb, sandbox)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -258,7 +259,7 @@ func (o *ObjReconciler) checkForDeletedSubcluster(ctx context.Context) (ctrl.Res
}

// Find any service objects that need to be deleted
svcs, err := finder.FindServices(ctx, iter.FindNotInVdb)
svcs, err := finder.FindServices(ctx, iter.FindNotInVdb, vapi.MainCluster)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
38 changes: 28 additions & 10 deletions pkg/controllers/vdb/offlineupgrade_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ var OfflineUpgradeStatusMsgs = []string{
}

// MakeOfflineUpgradeReconciler will build an OfflineUpgradeReconciler object
func MakeOfflineUpgradeReconciler(vdbrecon *VerticaDBReconciler, log logr.Logger,
vdb *vapi.VerticaDB, prunner cmds.PodRunner, pfacts *PodFacts, dispatcher vadmin.Dispatcher) controllers.ReconcileActor {
func MakeOfflineUpgradeReconciler(vdbrecon *VerticaDBReconciler, log logr.Logger, vdb *vapi.VerticaDB,
prunner cmds.PodRunner, pfacts *PodFacts, dispatcher vadmin.Dispatcher) controllers.ReconcileActor {
return &OfflineUpgradeReconciler{
VRec: vdbrecon,
Log: log.WithName("OfflineUpgradeReconciler"),
Expand All @@ -80,7 +80,12 @@ func MakeOfflineUpgradeReconciler(vdbrecon *VerticaDBReconciler, log logr.Logger
// Reconcile will handle the process of the vertica image changing. For
// example, this can automate the process for an upgrade.
func (o *OfflineUpgradeReconciler) Reconcile(ctx context.Context, _ *ctrl.Request) (ctrl.Result, error) {
if ok, err := o.Manager.IsUpgradeNeeded(ctx); !ok || err != nil {
sandbox := o.PFacts.GetSandboxName()
if ok, err := o.Manager.IsUpgradeNeeded(ctx, sandbox); !ok || err != nil {
return ctrl.Result{}, err
}

if err := o.Manager.logUpgradeStarted(sandbox); err != nil {
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -131,7 +136,7 @@ func (o *OfflineUpgradeReconciler) Reconcile(ctx context.Context, _ *ctrl.Reques
}
}

return ctrl.Result{}, nil
return ctrl.Result{}, o.Manager.logUpgradeSucceeded(sandbox)
}

// logEventIfOnlineUpgradeRequested will log an event if the vdb has
Expand Down Expand Up @@ -203,7 +208,8 @@ func (o *OfflineUpgradeReconciler) postReschedulePodsMsg(ctx context.Context) (c
// Since there will be processing after to delete the pods so that they come up
// with the new image.
func (o *OfflineUpgradeReconciler) updateImageInStatefulSets(ctx context.Context) (ctrl.Result, error) {
numStsChanged, err := o.Manager.updateImageInStatefulSets(ctx)
sandbox := o.PFacts.GetSandboxName()
numStsChanged, err := o.Manager.updateImageInStatefulSets(ctx, sandbox)
if numStsChanged > 0 {
o.PFacts.Invalidate()
}
Expand All @@ -215,7 +221,8 @@ func (o *OfflineUpgradeReconciler) updateImageInStatefulSets(ctx context.Context
// the sts is OnDelete. Deleting the pods ensures they get rescheduled with the
// new image.
func (o *OfflineUpgradeReconciler) deletePods(ctx context.Context) (ctrl.Result, error) {
numPodsDeleted, err := o.Manager.deletePodsRunningOldImage(ctx, "")
sandbox := o.PFacts.GetSandboxName()
numPodsDeleted, err := o.Manager.deletePodsRunningOldImage(ctx, "", sandbox)
if numPodsDeleted > 0 {
o.PFacts.Invalidate()
}
Expand All @@ -228,7 +235,8 @@ func (o *OfflineUpgradeReconciler) checkNMADeploymentChange(ctx context.Context)
return ctrl.Result{}, nil
}

stss, err := o.Finder.FindStatefulSets(ctx, iter.FindExisting)
sandbox := o.PFacts.GetSandboxName()
stss, err := o.Finder.FindStatefulSets(ctx, iter.FindExisting, sandbox)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -251,7 +259,12 @@ func (o *OfflineUpgradeReconciler) checkNMADeploymentChange(ctx context.Context)
// only occur if there is at least one pod that exists.
func (o *OfflineUpgradeReconciler) checkForNewPods(ctx context.Context) (ctrl.Result, error) {
foundPodWithNewImage := false
pods, err := o.Finder.FindPods(ctx, iter.FindExisting, vapi.MainCluster)
sandbox := o.PFacts.GetSandboxName()
pods, err := o.Finder.FindPods(ctx, iter.FindExisting, sandbox)
if err != nil {
return ctrl.Result{}, err
}
targetImage, err := o.Manager.getTargetImage(sandbox)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -261,7 +274,7 @@ func (o *OfflineUpgradeReconciler) checkForNewPods(ctx context.Context) (ctrl.Re
if err != nil {
return ctrl.Result{}, err
}
if cntImage == o.Vdb.Spec.Image {
if cntImage == targetImage {
foundPodWithNewImage = true
break
}
Expand Down Expand Up @@ -342,6 +355,11 @@ func (o *OfflineUpgradeReconciler) addClientRoutingLabel(ctx context.Context) (c

// anyPodsRunningWithOldImage will check if any upNode pods are running with the old image.
func (o *OfflineUpgradeReconciler) anyPodsRunningWithOldImage(ctx context.Context) (bool, error) {
sandbox := o.PFacts.GetSandboxName()
targetImage, err := o.Manager.getTargetImage(sandbox)
if err != nil {
return false, err
}
for pn, pf := range o.PFacts.Detail {
if !pf.upNode {
continue
Expand All @@ -359,7 +377,7 @@ func (o *OfflineUpgradeReconciler) anyPodsRunningWithOldImage(ctx context.Contex
if err != nil {
return false, err
}
if cntImage != o.Vdb.Spec.Image {
if cntImage != targetImage {
return true, nil
}
}
Expand Down
39 changes: 24 additions & 15 deletions pkg/controllers/vdb/onlineupgrade_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,29 @@ type OnlineUpgradeReconciler struct {
func MakeOnlineUpgradeReconciler(vdbrecon *VerticaDBReconciler, log logr.Logger,
vdb *vapi.VerticaDB, prunner cmds.PodRunner, pfacts *PodFacts, dispatcher vadmin.Dispatcher) controllers.ReconcileActor {
return &OnlineUpgradeReconciler{
VRec: vdbrecon,
Log: log.WithName("OnlineUpgradeReconciler"),
Vdb: vdb,
PRunner: prunner,
PFacts: pfacts,
Finder: iter.MakeSubclusterFinder(vdbrecon.Client, vdb),
Manager: *MakeUpgradeManager(vdbrecon, log, vdb, vapi.OnlineUpgradeInProgress, onlineUpgradeAllowed),
VRec: vdbrecon,
Log: log.WithName("OnlineUpgradeReconciler"),
Vdb: vdb,
PRunner: prunner,
PFacts: pfacts,
Finder: iter.MakeSubclusterFinder(vdbrecon.Client, vdb),
Manager: *MakeUpgradeManager(vdbrecon, log, vdb, vapi.OnlineUpgradeInProgress,
onlineUpgradeAllowed),
Dispatcher: dispatcher,
}
}

// Reconcile will handle the process of the vertica image changing. For
// example, this can automate the process for an upgrade.
func (o *OnlineUpgradeReconciler) Reconcile(ctx context.Context, _ *ctrl.Request) (ctrl.Result, error) {
if ok, err := o.Manager.IsUpgradeNeeded(ctx); !ok || err != nil {
sandbox := o.PFacts.GetSandboxName()
if ok, err := o.Manager.IsUpgradeNeeded(ctx, sandbox); !ok || err != nil {
return ctrl.Result{}, err
}

if err := o.Manager.logUpgradeStarted(sandbox); err != nil {
return ctrl.Result{}, err
}
// Functions to perform when the image changes. Order matters.
funcs := []func(context.Context) (ctrl.Result, error){
// Initiate an upgrade by setting condition and event recording
Expand Down Expand Up @@ -122,7 +127,7 @@ func (o *OnlineUpgradeReconciler) Reconcile(ctx context.Context, _ *ctrl.Request
}
}

return ctrl.Result{}, nil
return ctrl.Result{}, o.Manager.logUpgradeSucceeded(sandbox)
}

// loadSubclusterState will load state into the OnlineUpgradeReconciler that
Expand All @@ -135,8 +140,8 @@ func (o *OnlineUpgradeReconciler) loadSubclusterState(ctx context.Context) (ctrl
}

o.TransientSc = o.Vdb.FindTransientSubcluster()

err = o.Manager.cachePrimaryImages(ctx)
sandbox := o.PFacts.GetSandboxName()
err = o.Manager.cachePrimaryImages(ctx, sandbox)
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -202,7 +207,8 @@ func (o *OnlineUpgradeReconciler) addTransientToVdb(ctx context.Context) (ctrl.R
return ctrl.Result{}, nil
}

oldImage, ok := o.Manager.fetchOldImage()
sandbox := o.PFacts.GetSandboxName()
oldImage, ok := o.Manager.fetchOldImage(sandbox)
if !ok {
return ctrl.Result{}, fmt.Errorf("could not determine the old image name. "+
"Only available image is %s", o.Vdb.Spec.Image)
Expand Down Expand Up @@ -327,7 +333,8 @@ func (o *OnlineUpgradeReconciler) addClientRoutingLabelToTransientNodes(ctx cont
// processFunc for each one that matches the given type.
func (o *OnlineUpgradeReconciler) iterateSubclusterType(ctx context.Context, scType string,
processFunc func(context.Context, *appsv1.StatefulSet) (ctrl.Result, error)) (ctrl.Result, error) {
stss, err := o.Finder.FindStatefulSets(ctx, iter.FindExisting|iter.FindSorted)
sandbox := o.PFacts.GetSandboxName()
stss, err := o.Finder.FindStatefulSets(ctx, iter.FindExisting|iter.FindSorted, sandbox)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -454,7 +461,8 @@ func (o *OnlineUpgradeReconciler) recreateSubclusterWithNewImage(ctx context.Con
}

scName := sts.Labels[vmeta.SubclusterNameLabel]
podsDeleted, err := o.Manager.deletePodsRunningOldImage(ctx, scName)
sandbox := o.PFacts.GetSandboxName()
podsDeleted, err := o.Manager.deletePodsRunningOldImage(ctx, scName, sandbox)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -498,6 +506,7 @@ func (o *OnlineUpgradeReconciler) checkVersion(ctx context.Context, sts *appsv1.
}

func (o *OnlineUpgradeReconciler) handleDeploymentChange(ctx context.Context, _ *appsv1.StatefulSet) (ctrl.Result, error) {
sandbox := o.PFacts.GetSandboxName()
// We need to check if we are changing deployment types. This isn't allowed
// for online upgrade because the vclusterops library won't know how to talk
// to the pods that are still running the old admintools deployment since it
Expand All @@ -508,7 +517,7 @@ func (o *OnlineUpgradeReconciler) handleDeploymentChange(ctx context.Context, _
if primaryRunningVClusterOps > 0 && secondaryRunningAdmintools > 0 {
o.Log.Info("online upgrade isn't supported when changing deployment types from admintools to vclusterops",
"primaryRunningVClusterOps", primaryRunningVClusterOps, "secondaryRunningAdmintools", secondaryRunningAdmintools)
if err := o.Manager.deleteStsRunningOldImage(ctx); err != nil {
if err := o.Manager.deleteStsRunningOldImage(ctx, sandbox); err != nil {
return ctrl.Result{}, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/vdb/onlineupgrade_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ var _ = Describe("onlineupgrade_reconcile", func() {

r := createOnlineUpgradeReconciler(ctx, vdb)
Expect(r.loadSubclusterState(ctx)).Should(Equal(ctrl.Result{}))
oldImage, ok := r.Manager.fetchOldImage()
oldImage, ok := r.Manager.fetchOldImage(vapi.MainCluster)
Expect(ok).Should(BeTrue())
Expect(oldImage).Should(Equal(OldImage))
})
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/vdb/podfacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,17 @@ func (p *PodFacts) findExpectedNodeNames() []string {
return expectedNodeNames
}

// GetSandboxName returns the name of the sandbox, or empty string
// for main cluster, the pods belong to
func (p *PodFacts) GetSandboxName() string {
for _, v := range p.Detail {
// all pods in the podfacts belong to either
// the same sandbox or the main cluster
return v.sandbox
}
return ""
}

// setSandboxNodeType sets the isPrimary state for a sandboxed
// subcluster's node
func setSandboxNodeType(pf *PodFact) {
Expand Down
14 changes: 9 additions & 5 deletions pkg/controllers/vdb/replicatedupgrade_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,18 @@ func MakeReplicatedUpgradeReconciler(vdbrecon *VerticaDBReconciler, log logr.Log

// Reconcile will automate the process of a replicated upgrade.
func (r *ReplicatedUpgradeReconciler) Reconcile(ctx context.Context, _ *ctrl.Request) (ctrl.Result, error) {
if ok, err := r.Manager.IsUpgradeNeeded(ctx); !ok || err != nil {
if ok, err := r.Manager.IsUpgradeNeeded(ctx, vapi.MainCluster); !ok || err != nil {
return ctrl.Result{}, err
}

if err := r.PFacts[vapi.MainCluster].Collect(ctx, r.VDB); err != nil {
return ctrl.Result{}, err
}

if err := r.Manager.logUpgradeStarted(vapi.MainCluster); err != nil {
return ctrl.Result{}, err
}

// Functions to perform when the image changes. Order matters.
funcs := []func(context.Context) (ctrl.Result, error){
// Initiate an upgrade by setting condition and event recording
Expand Down Expand Up @@ -127,13 +131,13 @@ func (r *ReplicatedUpgradeReconciler) Reconcile(ctx context.Context, _ *ctrl.Req
}
}

return ctrl.Result{}, nil
return ctrl.Result{}, r.Manager.logUpgradeSucceeded(vapi.MainCluster)
}

// loadUpgradeState will load state into the reconciler that
// is used in subsequent steps.
func (r *ReplicatedUpgradeReconciler) loadUpgradeState(ctx context.Context) (ctrl.Result, error) {
err := r.Manager.cachePrimaryImages(ctx)
err := r.Manager.cachePrimaryImages(ctx, vapi.MainCluster)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -493,7 +497,7 @@ func (r *ReplicatedUpgradeReconciler) scaleOutSecondariesInReplicaGroupB(ctx con
// be added directly to r.VDB. This is a callback function for
// updateVDBWithRetry to prepare the vdb for update.
func (r *ReplicatedUpgradeReconciler) addNewSubclustersForPrimaries() (bool, error) {
oldImage, found := r.Manager.fetchOldImage()
oldImage, found := r.Manager.fetchOldImage(vapi.MainCluster)
if !found {
return false, errors.New("Could not find old image needed for new subclusters")
}
Expand Down Expand Up @@ -547,7 +551,7 @@ func (r *ReplicatedUpgradeReconciler) assignSubclustersToReplicaGroupACallback()
// replica group B into the sandbox. This is a callback function for
// updateVDBWithRetry to prepare the vdb for an update.
func (r *ReplicatedUpgradeReconciler) moveReplicaGroupBSubclusterToSandbox() (bool, error) {
oldImage, found := r.Manager.fetchOldImage()
oldImage, found := r.Manager.fetchOldImage(vapi.MainCluster)
if !found {
return false, errors.New("Could not find old image")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/vdb/revivedb_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (r *ReviveDBReconciler) genDescribeOpts(initiatorPod types.NamespacedName,
func (r *ReviveDBReconciler) deleteRevisionPendingSts(ctx context.Context) (ctrl.Result, error) {
numStsDeleted := 0
finder := iter.MakeSubclusterFinder(r.VRec.Client, r.Vdb)
stss, err := finder.FindStatefulSets(ctx, iter.FindInVdb)
stss, err := finder.FindStatefulSets(ctx, iter.FindInVdb, vapi.MainCluster)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
Loading

0 comments on commit 688289a

Please sign in to comment.