Skip to content

Commit

Permalink
Move restart_node and start_db behind dispatcher abstraction (vertica…
Browse files Browse the repository at this point in the history
…#428)

This is a continuation of building out the dispatcher interface. The
dispatcher interface will be used to switch between admintools and
vcluster library. This adds the final two calls: RestartNode and
StartDB.
  • Loading branch information
spilchen committed Jun 26, 2023
1 parent edb0481 commit 6a524b6
Show file tree
Hide file tree
Showing 14 changed files with 432 additions and 85 deletions.
94 changes: 23 additions & 71 deletions pkg/controllers/vdb/restart_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/vertica/vertica-kubernetes/pkg/vadmin"
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/fetchnodestate"
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/reip"
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/restartnode"
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/startdb"
"github.com/vertica/vertica-kubernetes/pkg/vdbstatus"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -288,10 +290,7 @@ func (r *RestartReconciler) restartPods(ctx context.Context, pods []*PodFact) (c
return r.makeResultForLivenessProbeWait(ctx)
}

vnodeList := genRestartVNodeList(downPods)
ipList := genRestartIPList(downPods)
cmd := r.genRestartNodeCmd(vnodeList, ipList)
if res, err := r.execRestartPods(ctx, downPods, cmd); verrors.IsReconcileAborted(res, err) {
if res, err := r.execRestartPods(ctx, downPods); verrors.IsReconcileAborted(res, err) {
return res, err
}

Expand Down Expand Up @@ -343,23 +342,30 @@ func (r *RestartReconciler) fetchClusterNodeStatus(ctx context.Context, pods []*
}

// execRestartPods will execute the AT command and event recording for restart pods.
func (r *RestartReconciler) execRestartPods(ctx context.Context, downPods []*PodFact, cmd []string) (ctrl.Result, error) {
func (r *RestartReconciler) execRestartPods(ctx context.Context, downPods []*PodFact) (ctrl.Result, error) {
podNames := make([]string, 0, len(downPods))
for _, pods := range downPods {
podNames = append(podNames, pods.name.Name)
}

opts := []restartnode.Option{
restartnode.WithInitiator(r.InitiatorPod, r.InitiatorPodIP),
}
for i := range downPods {
opts = append(opts, restartnode.WithHost(downPods[i].vnodeName, downPods[i].podIP))
}

r.VRec.Eventf(r.Vdb, corev1.EventTypeNormal, events.NodeRestartStarted,
"Starting database restart node of the following pods: %s", strings.Join(podNames, ", "))
start := time.Now()
labels := metrics.MakeVDBLabels(r.Vdb)
stdout, _, err := r.PRunner.ExecAdmintools(ctx, r.InitiatorPod, names.ServerContainer, cmd...)
res, err := r.Dispatcher.RestartNode(ctx, opts...)
elapsedTimeInSeconds := time.Since(start).Seconds()
metrics.NodesRestartDuration.With(labels).Observe(elapsedTimeInSeconds)
metrics.NodesRestartAttempt.With(labels).Inc()
if err != nil {
if verrors.IsReconcileAborted(res, err) {
metrics.NodesRestartFailed.With(labels).Inc()
return r.EVLogr.LogFailure("restart_node", stdout, err)
return res, err
}
r.VRec.Eventf(r.Vdb, corev1.EventTypeNormal, events.NodeRestartSucceeded,
"Successfully restarted database nodes and it took %ds", int(elapsedTimeInSeconds))
Expand Down Expand Up @@ -392,42 +398,29 @@ func (r *RestartReconciler) reipNodes(ctx context.Context, pods []*PodFact) (ctr
// restartCluster will call start database. It is assumed that the cluster has
// already run re_ip.
func (r *RestartReconciler) restartCluster(ctx context.Context, downPods []*PodFact) (ctrl.Result, error) {
cmd := r.genStartDBCommand(downPods)
opts := []startdb.Option{
startdb.WithInitiator(r.InitiatorPod, r.InitiatorPodIP),
}
for i := range downPods {
opts = append(opts, startdb.WithHost(downPods[i].podIP))
}
r.VRec.Event(r.Vdb, corev1.EventTypeNormal, events.ClusterRestartStarted,
"Starting restart of the cluster")
start := time.Now()
labels := metrics.MakeVDBLabels(r.Vdb)
stdout, _, err := r.PRunner.ExecAdmintools(ctx, r.InitiatorPod, names.ServerContainer, cmd...)
res, err := r.Dispatcher.StartDB(ctx, opts...)
elapsedTimeInSeconds := time.Since(start).Seconds()
metrics.ClusterRestartDuration.With(labels).Observe(elapsedTimeInSeconds)
metrics.ClusterRestartAttempt.With(labels).Inc()
if err != nil {
if verrors.IsReconcileAborted(res, err) {
metrics.ClusterRestartFailure.With(labels).Inc()
return r.EVLogr.LogFailure("start_db", stdout, err)
return res, err
}
r.VRec.Eventf(r.Vdb, corev1.EventTypeNormal, events.ClusterRestartSucceeded,
"Successfully restarted the cluster and it took %ds", int(elapsedTimeInSeconds))
return ctrl.Result{}, err
}

// genRestartVNodeList returns the vnodes of all of the hosts in downPods
func genRestartVNodeList(downPods []*PodFact) []string {
hostList := []string{}
for _, v := range downPods {
hostList = append(hostList, v.vnodeName)
}
return hostList
}

// genRestartIPList returns the IPs of all of the hosts in downPods
func genRestartIPList(downPods []*PodFact) []string {
ipList := []string{}
for _, v := range downPods {
ipList = append(ipList, v.podIP)
}
return ipList
}

// killReadOnlyProcesses will remove any running vertica processes that are
// currently in read-only. At this point, we have determined that the read-only
// nodes need to be shutdown so we can restart them to have full write access.
Expand Down Expand Up @@ -561,47 +554,6 @@ func (r *RestartReconciler) isStartupProbeActive(ctx context.Context, nm types.N
return true, nil
}

// genRestartNodeCmd returns the command to run to restart a pod
func (r *RestartReconciler) genRestartNodeCmd(vnodeList, ipList []string) []string {
cmd := []string{
"-t", "restart_node",
"--database=" + r.Vdb.Spec.DBName,
"--hosts=" + strings.Join(vnodeList, ","),
"--new-host-ips=" + strings.Join(ipList, ","),
"--noprompt",
}
if r.Vdb.Spec.RestartTimeout != 0 {
cmd = append(cmd, fmt.Sprintf("--timeout=%d", r.Vdb.Spec.RestartTimeout))
}
return cmd
}

// genStartDBCommand will return the command for start_db
func (r *RestartReconciler) genStartDBCommand(downPods []*PodFact) []string {
cmd := []string{
"-t", "start_db",
"--database=" + r.Vdb.Spec.DBName,
"--noprompt",
}
if r.Vdb.Spec.IgnoreClusterLease {
cmd = append(cmd, "--ignore-cluster-lease")
}
if r.Vdb.Spec.RestartTimeout != 0 {
cmd = append(cmd, fmt.Sprintf("--timeout=%d", r.Vdb.Spec.RestartTimeout))
}

// In all versions that we support we can include a list of hosts to start.
// This parameter becomes important for online upgrade as we use this to
// start the primaries while the secondary are in read-only.
hostNames := []string{}
for _, pod := range downPods {
hostNames = append(hostNames, pod.podIP)
}
cmd = append(cmd, "--hosts", strings.Join(hostNames, ","))

return cmd
}

// setATPod will set r.ATPod if not already set.
// Caller can indicate whether there is a requirement that it must be run from a
// pod that is current not running the vertica daemon.
Expand Down
14 changes: 0 additions & 14 deletions pkg/controllers/vdb/restart_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,20 +430,6 @@ var _ = Describe("restart_reconciler", func() {
Expect(len(restart)).Should(Equal(0))
})

It("should use --hosts option in start_db if on earliest version that we support", func() {
vdb := vapi.MakeVDB()
fpr := &cmds.FakePodRunner{}
dispatcher := vdbRec.makeDispatcher(logger, vdb, fpr)
act := MakeRestartReconciler(vdbRec, logger, vdb, fpr, &PodFacts{}, RestartProcessReadOnly, dispatcher)
r := act.(*RestartReconciler)
downPods := []*PodFact{
{podIP: "9.10.1.1"},
{podIP: "9.10.1.2"},
}
vdb.Annotations[vapi.VersionAnnotation] = vapi.MinimumVersion
Expect(r.genStartDBCommand(downPods)).Should(ContainElements("--hosts", "9.10.1.1,9.10.1.2"))
})

It("should requeue if k-safety is 0, there are no UP nodes and some pods aren't running", func() {
vdb := vapi.MakeVDB()
vdb.Spec.Subclusters[0].Size = 3
Expand Down
1 change: 1 addition & 0 deletions pkg/vadmin/add_sc_vc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

// AddSubcluster will create a subcluster in the vertica cluster.
func (v VClusterOps) AddSubcluster(ctx context.Context, opts ...addsc.Option) error {
v.Log.Info("Starting vcluster AddSubcluster")
s := addsc.Parms{}
s.Make(opts...)
return fmt.Errorf("not implemented")
Expand Down
12 changes: 12 additions & 0 deletions pkg/vadmin/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import (
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/reip"
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/removenode"
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/removesc"
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/restartnode"
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/revivedb"
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/startdb"
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/stopdb"
ctrl "sigs.k8s.io/controller-runtime"
)
Expand Down Expand Up @@ -69,6 +71,16 @@ type Dispatcher interface {

// RemoveSubcluster will remove the given subcluster from the vertica cluster.
RemoveSubcluster(ctx context.Context, opts ...removesc.Option) error

// RestartNode will restart a subset of nodes. Use this when vertica has not
// lost cluster quorum. The IP given for each vnode may not match the current IP
// in the vertica catalogs.
RestartNode(ctx context.Context, opts ...restartnode.Option) (ctrl.Result, error)

// StartDB will start a subset of nodes. Use this when vertica has lost
// cluster quorum. The IP given for each vnode *must* match the current IP
// in the vertica catalog. If they aren't a call to ReIP is necessary.
StartDB(ctx context.Context, opts ...startdb.Option) (ctrl.Result, error)
}

const (
Expand Down
57 changes: 57 additions & 0 deletions pkg/vadmin/opts/restartnode/opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
(c) Copyright [2021-2023] Open Text.
Licensed under the Apache License, Version 2.0 (the "License");
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package restartnode

import (
"k8s.io/apimachinery/pkg/types"
)

// Parms holds all of the option for a revive DB invocation.
type Parms struct {
InitiatorName types.NamespacedName
InitiatorIP string
HostVNodes []string
HostIPs []string
}

type Option func(*Parms)

// Make will fill in the Parms based on the options chosen
func (s *Parms) Make(opts ...Option) {
for _, opt := range opts {
opt(s)
}
}

func WithInitiator(nm types.NamespacedName, ip string) Option {
return func(s *Parms) {
s.InitiatorName = nm
s.InitiatorIP = ip
}
}

func WithHost(vnode, ip string) Option {
return func(s *Parms) {
if s.HostVNodes == nil {
s.HostVNodes = make([]string, 0)
}
if s.HostIPs == nil {
s.HostIPs = make([]string, 0)
}
s.HostVNodes = append(s.HostVNodes, vnode)
s.HostIPs = append(s.HostIPs, ip)
}
}
52 changes: 52 additions & 0 deletions pkg/vadmin/opts/startdb/opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
(c) Copyright [2021-2023] Open Text.
Licensed under the Apache License, Version 2.0 (the "License");
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package startdb

import (
"k8s.io/apimachinery/pkg/types"
)

// Parms holds all of the option for a revive DB invocation.
type Parms struct {
InitiatorName types.NamespacedName
InitiatorIP string
Hosts []string
}

type Option func(*Parms)

// Make will fill in the Parms based on the options chosen
func (s *Parms) Make(opts ...Option) {
for _, opt := range opts {
opt(s)
}
}

func WithInitiator(nm types.NamespacedName, ip string) Option {
return func(s *Parms) {
s.InitiatorName = nm
s.InitiatorIP = ip
}
}

func WithHost(hostName string) Option {
return func(s *Parms) {
if s.Hosts == nil {
s.Hosts = make([]string, 0)
}
s.Hosts = append(s.Hosts, hostName)
}
}
1 change: 1 addition & 0 deletions pkg/vadmin/remove_node_vc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

// RemoveNode will remove an existng vertica node from the cluster.
func (v VClusterOps) RemoveNode(ctx context.Context, opts ...removenode.Option) error {
v.Log.Info("Starting vcluster RemoveNode")
s := removenode.Parms{}
s.Make(opts...)
return fmt.Errorf("not implemented")
Expand Down
1 change: 1 addition & 0 deletions pkg/vadmin/remove_sc_vc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

// RemoveSubcluster will remove the given subcluster from the vertica cluster.
func (v VClusterOps) RemoveSubcluster(ctx context.Context, opts ...removesc.Option) error {
v.Log.Info("Starting vcluster RemoveSubcluster")
s := removesc.Parms{}
s.Make(opts...)
return fmt.Errorf("not implemented")
Expand Down
55 changes: 55 additions & 0 deletions pkg/vadmin/restart_node_at.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
(c) Copyright [2021-2023] Open Text.
Licensed under the Apache License, Version 2.0 (the "License");
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vadmin

import (
"context"
"fmt"
"strings"

"github.com/vertica/vertica-kubernetes/pkg/events"
"github.com/vertica/vertica-kubernetes/pkg/vadmin/opts/restartnode"
ctrl "sigs.k8s.io/controller-runtime"
)

// RestartNode will restart a subset of nodes. Use this when vertica has not
// lost cluster quorum. The IP given for each vnode may not match the current IP
// in the vertica catalogs.
func (a Admintools) RestartNode(ctx context.Context, opts ...restartnode.Option) (ctrl.Result, error) {
s := restartnode.Parms{}
s.Make(opts...)
cmd := a.genRestartNodeCmd(&s)
stdout, err := a.execAdmintools(ctx, s.InitiatorName, cmd...)
if err != nil {
return a.logFailure("restart_node", events.MgmtFailed, stdout, err)
}
return ctrl.Result{}, nil
}

// genRestartNodeCmd returns the command to run to restart a pod
func (a Admintools) genRestartNodeCmd(s *restartnode.Parms) []string {
cmd := []string{
"-t", "restart_node",
"--database=" + a.VDB.Spec.DBName,
"--hosts=" + strings.Join(s.HostVNodes, ","),
"--new-host-ips=" + strings.Join(s.HostIPs, ","),
"--noprompt",
}
if a.VDB.Spec.RestartTimeout != 0 {
cmd = append(cmd, fmt.Sprintf("--timeout=%d", a.VDB.Spec.RestartTimeout))
}
return cmd
}
Loading

0 comments on commit 6a524b6

Please sign in to comment.