Skip to content

Commit

Permalink
Reconcile changes to the nats streaming server image string
Browse files Browse the repository at this point in the history
  • Loading branch information
renier authored and Renier Morales committed Aug 9, 2020
1 parent 079120f commit 250e912
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 4 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ spec:
To build the `nats-streaming-operator` Docker image:

```sh
$ docker build . -f docker/operator/Dockerfile -t <image>:<tag> .
$ docker build -f docker/operator/Dockerfile -t <image>:<tag> .
```

You'll need Docker `17.06.0-ce` or higher.
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ k8s.io/apiextensions-apiserver v0.0.0-20190320070711-2af94a2a482f h1:MfV9UCbLKKn
k8s.io/apiextensions-apiserver v0.0.0-20190320070711-2af94a2a482f/go.mod h1:IxkesAMoaCRoLrPJdZNZUQp9NfZnzqaVzLhb2VEQzXE=
k8s.io/apimachinery v0.0.0-20190320104356-82cbdc1b6ac2 h1:kAl8fP8Gk3mJ4hZBQOkQ1HkrD1i5n22S3ZKlVGTJsBA=
k8s.io/apimachinery v0.0.0-20190320104356-82cbdc1b6ac2/go.mod h1:ccL7Eh7zubPUSh9A3USN90/OzHNSVN6zxzde07TDCL0=
k8s.io/apimachinery v0.18.6 h1:RtFHnfGNfd1N0LeSrKCUznz5xtUP1elRGvHJbL3Ntag=
k8s.io/client-go v1.5.1 h1:XaX/lo2/u3/pmFau8HN+sB5C/b4dc4Dmm2eXjBH4p1E=
k8s.io/client-go v10.0.0+incompatible h1:F1IqCqw7oMBzDkqlcBymRq1450wD0eNqLE9jzUrIi34=
k8s.io/client-go v10.0.0+incompatible/go.mod h1:7vJpHMYJwNQCWgzmNV+VYUl1zCObLyodBc8nIyt8L5s=
k8s.io/client-go v11.0.0+incompatible h1:LBbX2+lOwY9flffWlJM7f1Ct8V2SRNiMRDFeiwnJo9o=
k8s.io/klog v0.2.0 h1:0ElL0OHzF3N+OhoJTL0uca20SxtYt4X4+bzHeqrB83c=
k8s.io/klog v0.2.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/kube-openapi v0.0.0-20190320154901-5e45bb682580 h1:fq0ZXW/BAIFZH+dazlups6JTVdwzRo5d9riFA103yuQ=
Expand Down
2 changes: 1 addition & 1 deletion internal/operator/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
// DefaultNATSStreamingImage is the default image
// of NATS Streaming that will be used, meant to be
// the latest release available.
DefaultNATSStreamingImage = "nats-streaming:0.16.2"
DefaultNATSStreamingImage = "nats-streaming:0.18.0"

// DefaultNATSStreamingClusterSize is the default size
// for the cluster. Clustering is done via Raft so
Expand Down
43 changes: 43 additions & 0 deletions internal/operator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
k8slabels "k8s.io/apimachinery/pkg/labels"
k8sschema "k8s.io/apimachinery/pkg/runtime/schema"
k8stypes "k8s.io/apimachinery/pkg/types"
k8sutilwait "k8s.io/apimachinery/pkg/util/wait"
k8sclient "k8s.io/client-go/kubernetes"
k8srestapi "k8s.io/client-go/rest"
k8scache "k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -78,6 +79,11 @@ func NewController(opts *Options) *Controller {
}
}

func k8sDeleteInBackground() *k8smetav1.DeleteOptions {
propagation := k8smetav1.DeletePropagationBackground
return &k8smetav1.DeleteOptions{PropagationPolicy: &propagation}
}

// SetupClients takes the configuration and prepares the rest
// clients that will be used to interact with the cluster objects.
func (c *Controller) SetupClients(cfg *k8srestapi.Config) error {
Expand Down Expand Up @@ -287,6 +293,13 @@ func (c *Controller) processDelete(ctx context.Context, v interface{}) error {
}

func (c *Controller) reconcile(o *stanv1alpha1.NatsStreamingCluster) error {
if err := c.reconcileSize(o); err != nil {
return err
}
return c.reconcileImage(o)
}

func (c *Controller) reconcileSize(o *stanv1alpha1.NatsStreamingCluster) error {
pods, err := c.findRunningPods(o.Name, o.Namespace)
if err != nil {
return err
Expand Down Expand Up @@ -330,6 +343,36 @@ func (c *Controller) reconcile(o *stanv1alpha1.NatsStreamingCluster) error {
return nil
}

func (c *Controller) reconcileImage(o *stanv1alpha1.NatsStreamingCluster) error {
pods, err := c.findRunningPods(o.Name, o.Namespace)
if err != nil {
return err
}

desiredImage := o.Spec.Image
for _, pod := range pods {
if desiredImage != pod.Spec.Containers[0].Image {
log.Infof("Outdated image in pod '%s/%s' restarting it...", o.Namespace, pod.ObjectMeta.Name)
c.kc.CoreV1().Pods(o.Namespace).Delete(pod.ObjectMeta.Name, k8sDeleteInBackground())
// Wait for the pod to delete and the new one to get to Ready before moving on to the next one.
k8sutilwait.PollImmediate(5*time.Second, 5*time.Minute, func() (done bool, err error) {
newPod, err := c.kc.CoreV1().Pods(o.Namespace).Get(pod.ObjectMeta.Name, k8smetav1.GetOptions{})
if err != nil {
log.Infof("Pod gone '%s/%s' gone, waiting for it to restart", o.Namespace, pod.ObjectMeta.Name)
return false, nil
} else if newPod.UID == pod.UID {
log.Infof("Pod '%s/%s' not deleted yet. Waiting until it does...", o.Namespace, pod.ObjectMeta.Name)
return false, nil // this is not the new pod yet
}

return newPod.Status.Phase == k8scorev1.PodRunning && newPod.Status.ContainerStatuses[0].Ready, nil
})
}
}

return nil
}

func (c *Controller) createBootstrapPod(o *stanv1alpha1.NatsStreamingCluster) error {
pod := newStanPod(o)
pod.Name = fmt.Sprintf("%s-1", o.Name)
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/streaming/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type NatsStreamingClusterSpec struct {
// Size is the number of nodes in the NATS Streaming cluster.
Size int32 `json:"size"`

// Version is the version of NATS Streaming that is being used.
// By default it will be the latest version.
// Image is the version of NATS Streaming that is being used.
// By default it will be set to the latest version.
Image string `json:"image"`

// NatsService is the Kubernetes service to which the NATS
Expand Down

0 comments on commit 250e912

Please sign in to comment.