Skip to content

Commit

Permalink
Merge pull request #60 from nats-io/ft-option
Browse files Browse the repository at this point in the history
Add option to switch to FT mode
  • Loading branch information
wallyqs authored Dec 20, 2019
2 parents c4f8d6a + d54b9d2 commit 11c50be
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 133 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ sudo: required

language: go
go:
- "1.11.x"
- "1.13.x"

go_import_path: github.com/nats-io/nats-streaming-operator

Expand Down
30 changes: 30 additions & 0 deletions deploy/examples/example-stan-ft-mode.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
apiVersion: "streaming.nats.io/v1alpha1"
kind: "NatsStreamingCluster"
metadata:
name: "stan-ft"
spec:
natsSvc: "nats"
size: 3

image: "nats-streaming:latest"

config:
storeDir: "/pv/stan"

# When this option is introduced, the servers are restarted
# one by one until they are all running with ft mode.
ftGroup: "stan"

# Define mounts in the Pod Spec
template:
spec:
volumes:
- name: stan-store-dir
persistentVolumeClaim:
claimName: streaming-pvc
containers:
- name: nats-streaming
volumeMounts:
- mountPath: /pv
name: stan-store-dir
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,32 @@ module github.com/nats-io/nats-streaming-operator
go 1.12

require (
contrib.go.opencensus.io/exporter/ocagent v0.4.9 // indirect
github.com/Azure/go-autorest v11.5.2+incompatible // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/evanphx/json-patch v4.1.0+incompatible // indirect
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/mock v1.2.0 // indirect
github.com/golang/groupcache v0.0.0-20191027212112-611e8accdfc9 // indirect
github.com/golang/protobuf v1.3.1 // indirect
github.com/google/btree v1.0.0 // indirect
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf // indirect
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/gophercloud/gophercloud v0.0.0-20190318015731-ff9851476e98 // indirect
github.com/gregjones/httpcache v0.0.0-20190212212710-3befbb6ad0cc // indirect
github.com/grpc-ecosystem/grpc-gateway v1.8.5 // indirect
github.com/hashicorp/golang-lru v0.5.1 // indirect
github.com/imdario/mergo v0.3.7 // indirect
github.com/json-iterator/go v1.1.6 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/onsi/ginkgo v1.7.0 // indirect
github.com/onsi/gomega v1.4.3 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/sirupsen/logrus v1.4.0
github.com/spf13/pflag v1.0.3 // indirect
go.opencensus.io v0.19.2 // indirect
golang.org/x/crypto v0.0.0-20190320223903-b7391e95e576 // indirect
golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53 // indirect
golang.org/x/oauth2 v0.0.0-20190319182350-c85d3e98c914 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
k8s.io/api v0.0.0-20190111032252-67edc246be36
k8s.io/apiextensions-apiserver v0.0.0-20190320070711-2af94a2a482f
k8s.io/apimachinery v0.0.0-20190320104356-82cbdc1b6ac2
Expand Down
128 changes: 15 additions & 113 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions internal/operator/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import "time"

const (
// Version is the version of the NATS Streaming Operator.
Version = "0.2.2"
Version = "0.3.0"

// DefaultNATSStreamingImage is the default image
// of NATS Streaming that will be used, meant to be
// the latest release available.
DefaultNATSStreamingImage = "nats-streaming:0.12.2"
DefaultNATSStreamingImage = "nats-streaming:0.16.2"

// DefaultNATSStreamingClusterSize is the default size
// for the cluster. Clustering is done via Raft so
Expand Down
24 changes: 17 additions & 7 deletions internal/operator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (c *Controller) reconcile(o *stanv1alpha1.NatsStreamingCluster) error {
} else if n < 0 {
log.Infof("Missing pods for '%s/%s' cluster (size=%d/%d), creating %d pods...", o.Namespace, o.Name, len(pods), o.Spec.Size, n*-1)

if o.Spec.StoreType == "SQL" {
if o.Spec.StoreType == "SQL" || (o.Spec.Config != nil && o.Spec.Config.FTGroup != "") {
return c.createMissingPods(o, n*-1)
}

Expand Down Expand Up @@ -389,19 +389,29 @@ func stanContainerCmd(o *stanv1alpha1.NatsStreamingCluster, pod *k8scorev1.Pod)
} else {
storeArgs = []string{
"-store", "file",
fmt.Sprintf("--cluster_node_id=%q", pod.Name),
}

// Disable clustering if using single instance.
if o.Spec.Size > 1 {
ftModeEnabled := o.Spec.Config != nil && o.Spec.Config.FTGroup != ""
isClustered := o.Spec.Config != nil && (o.Spec.Size > 1 || o.Spec.Config.Clustered)

// Disable clustering if using single instance or FT mode.
if isClustered && !ftModeEnabled {
storeArgs = append(storeArgs, "-clustered")
storeArgs = append(storeArgs, fmt.Sprintf("--cluster_node_id=%q", pod.Name))
}

// Allow using a custom mount path which could be a persistent volume.
if o.Spec.Config != nil && o.Spec.Config.StoreDir != "" {
storeArgs = append(storeArgs, "-dir", o.Spec.Config.StoreDir+"/"+pod.Name)

if o.Spec.Size > 1 {
if ftModeEnabled {
// In case of FT mode then use the name of the first pod
// as the storage directory in order to make it possible
// to switch from clustered mode to fault tolerance mode.
name := fmt.Sprintf("%s-1", o.Name)
storeArgs = append(storeArgs, "-dir", o.Spec.Config.StoreDir+"/"+name)
storeArgs = append(storeArgs, fmt.Sprintf("--ft_group=%s", o.Spec.Config.FTGroup))
} else {
// Using clustering.
storeArgs = append(storeArgs, "-dir", o.Spec.Config.StoreDir+"/"+pod.Name)
storeArgs = append(storeArgs, "--cluster_log_path", o.Spec.Config.StoreDir+"/raft/"+pod.Name)
}
} else {
Expand Down
8 changes: 7 additions & 1 deletion pkg/apis/streaming/v1alpha1/types.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 The NATS Authors
// Copyright 2018-2019 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -79,6 +79,12 @@ type ServerConfig struct {
// StoreDir is the directory where the files will be persisted,
// in case file system is backed by a persistent volume.
StoreDir string `json:"storeDir"`

// FTGroup enables the fault tolerance mode for the server.
FTGroup string `json:"ftGroup"`

// Clustered enables explicitly in the cluster
Clustered bool `json:"clustered"`
}

type NatsStreamingClusterStatus struct {
Expand Down
83 changes: 81 additions & 2 deletions test/operator/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,89 @@ func TestCreateWithCustomStoreDirTemplate(t *testing.T) {
Namespace: "default",
},
Spec: stanv1alpha1.NatsStreamingClusterSpec{
Size: 1,
Size: 3,
NatsService: "example-nats",
Config: &stanv1alpha1.ServerConfig{
StoreDir: "/my-store-dir",
Clustered: true,
},
PodTemplate: &k8scorev1.PodTemplateSpec{
Spec: k8scorev1.PodSpec{
RestartPolicy: k8scorev1.RestartPolicyNever,
},
},
},
}
_, err = kc.stan.StreamingV1alpha1().NatsStreamingClusters("default").Create(cluster)
if err != nil {
t.Fatal(err)
}
defer func() {
err := kc.stan.StreamingV1alpha1().NatsStreamingClusters("default").Delete(name, &k8smetav1.DeleteOptions{})
if err != nil {
t.Error(err)
}
}()

opts := k8smetav1.ListOptions{
LabelSelector: k8slabels.SelectorFromSet(map[string]string{
"app": "nats-streaming",
"stan_cluster": name,
}).String(),
}

err = waitFor(ctx, func() error {
result, err := kc.core.Pods("default").List(opts)
if err != nil {
return err
}
for _, item := range result.Items {
got := strings.Join(item.Spec.Containers[0].Command, " ")
expected := `/nats-streaming-server -cluster_id stan-cluster-custom-store-dir-test -nats_server nats://example-nats:4222 -m 8222 -store file -clustered --cluster_node_id="stan-cluster-custom-store-dir-test-1" -dir /my-store-dir/stan-cluster-custom-store-dir-test-1 --cluster_log_path /my-store-dir/raft/stan-cluster-custom-store-dir-test-1 -cluster_bootstrap`
if got != expected {
return fmt.Errorf("Expected %s, got: %s", expected, got)
}
}

got := len(result.Items)
if got < 1 {
return fmt.Errorf("Not enough pods, got: %v", got)
}

return nil
})
if err != nil {
t.Error(err)
}
}

func TestCreateWithFTGroup(t *testing.T) {
kc, err := newKubeClients()
if err != nil {
t.Fatal(err)
}
controller := operator.NewController(nil)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

go controller.Run(ctx)

name := "stan-cluster-ft-group"
cluster := &stanv1alpha1.NatsStreamingCluster{
TypeMeta: k8smetav1.TypeMeta{
Kind: "NatsStreamingCluster",
APIVersion: stanv1alpha1.SchemeGroupVersion.String(),
},
ObjectMeta: k8smetav1.ObjectMeta{
Name: name,
Namespace: "default",
},
Spec: stanv1alpha1.NatsStreamingClusterSpec{
Size: 3,
NatsService: "example-nats",
Config: &stanv1alpha1.ServerConfig{
StoreDir: "/my-store-dir",
FTGroup: "stan-ft",
},
PodTemplate: &k8scorev1.PodTemplateSpec{
Spec: k8scorev1.PodSpec{
Expand Down Expand Up @@ -320,7 +399,7 @@ func TestCreateWithCustomStoreDirTemplate(t *testing.T) {
}
for _, item := range result.Items {
got := strings.Join(item.Spec.Containers[0].Command, " ")
expected := `/nats-streaming-server -cluster_id stan-cluster-custom-store-dir-test -nats_server nats://example-nats:4222 -m 8222 -store file --cluster_node_id="stan-cluster-custom-store-dir-test-1" -dir /my-store-dir/stan-cluster-custom-store-dir-test-1`
expected := `/nats-streaming-server -cluster_id stan-cluster-ft-group -nats_server nats://example-nats:4222 -m 8222 -store file -dir /my-store-dir/stan-cluster-ft-group-1 --ft_group=stan-ft`
if got != expected {
return fmt.Errorf("Expected %s, got: %s", expected, got)
}
Expand Down

0 comments on commit 11c50be

Please sign in to comment.