Skip to content

Commit

Permalink
Synchronize Gateway status into the local k8s API
Browse files Browse the repository at this point in the history
Related-Issue: #308
  • Loading branch information
mangelajo committed Apr 14, 2020
1 parent 8135c91 commit dde698a
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 41 deletions.
53 changes: 28 additions & 25 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ const (
defaultRetryPeriod = 2 // In Seconds
)

var VERSION = "not-compiled-properly"

func main() {
klog.InitFlags(nil)
flag.Parse()
Expand Down Expand Up @@ -90,39 +92,40 @@ func main() {
submarinerInformerFactory := submarinerInformers.NewSharedInformerFactoryWithOptions(submarinerClient, time.Second*30,
submarinerInformers.WithNamespace(submSpec.Namespace))

start := func(context.Context) {
var localSubnets []string
var localSubnets []string

klog.Info("Creating the cable engine")
klog.Info("Creating the cable engine")

localCluster, err := util.GetLocalCluster(submSpec)
if err != nil {
klog.Fatalf("Error creating local cluster object from %#v: %v", submSpec, err)
}
localCluster, err := util.GetLocalCluster(submSpec)
if err != nil {
klog.Fatalf("Error creating local cluster object from %#v: %v", submSpec, err)
}

if len(submSpec.GlobalCidr) > 0 {
localSubnets = submSpec.GlobalCidr
} else {
localSubnets = append(submSpec.ServiceCidr, submSpec.ClusterCidr...)
}
if len(submSpec.GlobalCidr) > 0 {
localSubnets = submSpec.GlobalCidr
} else {
localSubnets = append(submSpec.ServiceCidr, submSpec.ClusterCidr...)
}

if len(submSpec.CableDriver) == 0 {
submSpec.CableDriver = cable.GetDefaultCableDriver()
}
submSpec.CableDriver = strings.ToLower(submSpec.CableDriver)
if len(submSpec.CableDriver) == 0 {
submSpec.CableDriver = cable.GetDefaultCableDriver()
}
submSpec.CableDriver = strings.ToLower(submSpec.CableDriver)

localEndpoint, err := util.GetLocalEndpoint(submSpec.ClusterID, submSpec.CableDriver, map[string]string{}, submSpec.NatEnabled,
localSubnets, util.GetLocalIP())
localEndpoint, err := util.GetLocalEndpoint(submSpec.ClusterID, submSpec.CableDriver, map[string]string{}, submSpec.NatEnabled,
localSubnets, util.GetLocalIP())

if err != nil {
klog.Fatalf("Error creating local endpoint object from %#v: %v", submSpec, err)
}
if err != nil {
klog.Fatalf("Error creating local endpoint object from %#v: %v", submSpec, err)
}

cableEngine, err := cableengine.NewEngine(localSubnets, localCluster, localEndpoint)
if err != nil {
klog.Fatalf("Fatal error occurred creating engine: %v", err)
}
cableEngine, err := cableengine.NewEngine(localSubnets, localCluster, localEndpoint, submarinerClient,
submSpec.Namespace, VERSION, stopCh)
if err != nil {
klog.Fatalf("Fatal error occurred creating engine: %v", err)
}

start := func(context.Context) {
klog.Info("Creating the tunnel controller")

tunnelController := tunnel.NewController(submSpec.Namespace, cableEngine, kubeClient, submarinerClient,
Expand Down
156 changes: 140 additions & 16 deletions pkg/cableengine/cableengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@ package cableengine

import (
"fmt"
"os"
"reflect"
"sync"
"time"

v1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1"
"github.com/submariner-io/submariner/pkg/cable"
submarinerClientset "github.com/submariner-io/submariner/pkg/client/clientset/versioned"
"github.com/submariner-io/submariner/pkg/log"
"github.com/submariner-io/submariner/pkg/types"
"github.com/submariner-io/submariner/pkg/util"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"

// Add supported drivers
Expand All @@ -31,37 +37,160 @@ type Engine interface {
// RemoveCable disconnects the Engine from the given remote endpoint. Upon completion.
// remote Pods and Service may not be accessible any more.
RemoveCable(remote types.SubmarinerEndpoint) error
// ListCableConnections returns a list of cable connection, and the related status
ListCableConnections() (*[]v1.Connection, error)
}

type engine struct {
sync.Mutex

clientset *submarinerClientset.Clientset
driver cable.Driver
localSubnets []string
localCluster types.SubmarinerCluster
localEndpoint types.SubmarinerEndpoint
version string
namespace string
}

// NewEngine creates a new Engine for the local cluster
func NewEngine(localSubnets []string, localCluster types.SubmarinerCluster, localEndpoint types.SubmarinerEndpoint) (Engine, error) {
driver, err := cable.NewDriver(localSubnets, localEndpoint)
if err != nil {
return nil, err
}
func NewEngine(localSubnets []string, localCluster types.SubmarinerCluster, localEndpoint types.SubmarinerEndpoint,
clientset *submarinerClientset.Clientset, submNamespace string, version string, stopCh <-chan struct{}) (Engine, error) {

return &engine{
i := engine{
clientset: clientset,
namespace: submNamespace,
localCluster: localCluster,
localEndpoint: localEndpoint,
localSubnets: localSubnets,
driver: driver,
}, nil
driver: nil,
version: version,
}

go wait.Until(i.syncGatewayStatus, GatewayUpdateIntervalSeconds*time.Second, stopCh)

klog.Info("CableEngine controller started")

return &i, nil
}

const GatewayUpdateIntervalSeconds = 5

func (i *engine) StartEngine() error {
if err := i.startDriver(); err != nil {
return err
}

klog.Infof("Starting %s", i.driver.GetName())
return i.driver.Init()

return nil

}

func (i *engine) startDriver() error {
var err error

if i.driver, err = cable.NewDriver(i.localSubnets, i.localEndpoint); err != nil {
return err
}

if err = i.driver.Init(); err != nil {
return err
}
return nil
}

func (i *engine) syncGatewayStatus() {

i.Lock()
defer i.Unlock()

klog.V(log.TRACE).Info("Running syncGatewayStatus()")

gatewayObj, err := i.generateGatewayObject()
if err != nil {
klog.Errorf("generating gateway object from driver connections: %s", err)
return
}

existingGw, err := i.getLastSyncedGateway(gatewayObj.Name)

// log and stop for any error different to a not-found error
if err != nil && !errors.IsNotFound(err) {
klog.Errorf("trying to read existing gateway from k8s api: %s", err)
return
}

gwClient := i.clientset.SubmarinerV1().Gateways(i.namespace)

if err != nil && errors.IsNotFound(err) {
klog.V(log.TRACE).Infof("Gateway object needs creation: %+v", gatewayObj)
_, err = gwClient.Create(gatewayObj)
if err != nil {
klog.Errorf("creating Gateway object: %s", err)
return
}
} else {
if !reflect.DeepEqual(gatewayObj.Status, existingGw.Status) {
klog.V(log.TRACE).Infof("Gateway object needs an update: %+v", gatewayObj)
existingGw.Status = gatewayObj.Status
existingGw.Labels = gatewayObj.Labels
// this should not be necessary, workaround for:
// E0414 17:00:44.057077 1 cableengine.go:143] updating Gateway object: gateways.submariner.io "cluster2-worker" is invalid: metadata.resourceVersion: Invalid value: 0x0: must be specified for an update
existingGw.TypeMeta = gatewayObj.TypeMeta
gw, err := gwClient.Update(existingGw)
if err != nil {
klog.Errorf("updating Gateway object: %s", err)
return
} else {
klog.V(log.TRACE).Infof("Gateway updated correctly: %+v", gw)
}
} else {
klog.V(log.TRACE).Info("Gateway object didn't need an update")
}
}
}

func (i *engine) getLastSyncedGateway(name string) (*v1.Gateway, error) {

existingGw, err := i.clientset.SubmarinerV1().Gateways(i.namespace).Get(name, metav1.GetOptions{})
klog.V(log.TRACE).Infof("getLastSyncedGateway: %+v", existingGw)
return existingGw, err
}

func (i *engine) generateGatewayObject() (*v1.Gateway, error) {
var hostName string
var err error

if hostName, err = os.Hostname(); err != nil {
return nil, err
}

gateway := v1.Gateway{
Status: v1.GatewayStatus{Version: i.version, Host: hostName},
ObjectMeta: metav1.ObjectMeta{Name: hostName},
}

gateway.Name = hostName

// we should not really need to set those, but we're getting errors otherwise :?
gateway.Kind = "Gateway"
gateway.APIVersion = "submariner.io/v1"

if i.driver == nil {
gateway.Status.HAStatus = v1.HAStatusPassive
gateway.SetLabels(map[string]string{"ha-status": "passive"})
} else {
gateway.Status.HAStatus = v1.HAStatusActive
gateway.SetLabels(map[string]string{"ha-status": "active"})
connections, err := i.driver.GetConnections()
if err != nil {
klog.Errorf("getting driver connections: %s", err)
return nil, err
}
gateway.Status.Connections = *connections
}

klog.V(log.TRACE).Infof("generateGatewayObject: %+v", gateway)
return &gateway, nil
}

func (i *engine) InstallCable(endpoint types.SubmarinerEndpoint) error {
Expand Down Expand Up @@ -120,8 +249,3 @@ func (i *engine) RemoveCable(endpoint types.SubmarinerEndpoint) error {
klog.Infof("Successfully removed Endpoint cable %q", endpoint.Spec.CableName)
return nil
}

func (i *engine) ListCableConnections() (*[]v1.Connection, error) {

return i.driver.GetConnections()
}

0 comments on commit dde698a

Please sign in to comment.