Skip to content

Commit

Permalink
Remove Configuration state from controller
Browse files Browse the repository at this point in the history
  • Loading branch information
dtomcej authored and traefiker committed Oct 8, 2019
1 parent 53f2a86 commit 53c8104
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 38 deletions.
33 changes: 7 additions & 26 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/containous/maesh/internal/deployer"
"github.com/containous/maesh/internal/k8s"
"github.com/containous/maesh/internal/message"
"github.com/containous/maesh/internal/providers/base"
"github.com/containous/maesh/internal/providers/kubernetes"
"github.com/containous/maesh/internal/providers/smi"
"github.com/containous/traefik/v2/pkg/config/dynamic"
Expand Down Expand Up @@ -43,7 +42,6 @@ type Controller struct {
deployer *deployer.Deployer
ignored k8s.IgnoreWrapper
smiEnabled bool
traefikConfig *dynamic.Configuration
defaultMode string
meshNamespace string
tcpStateTable *k8s.State
Expand Down Expand Up @@ -107,9 +105,6 @@ func (c *Controller) Init() error {
// Initialize the deployer.
c.deployer = deployer.New(c.clients, c.configurationQueue, c.meshNamespace)

// Initialize an empty configuration with a readinesscheck so that configs deployed to nodes mark them as ready.
c.traefikConfig = base.CreateBaseConfigWithReadiness()

if c.smiEnabled {
c.smiProvider = smi.New(c.clients, c.defaultMode, c.meshNamespace, c.ignored)

Expand All @@ -122,9 +117,6 @@ func (c *Controller) Init() error {

c.smiSplitFactory = smiSplitExternalversions.NewSharedInformerFactoryWithOptions(c.clients.SmiSplitClient, k8s.ResyncPeriod)
c.smiSplitFactory.Split().V1alpha1().TrafficSplits().Informer().AddEventHandler(c.handler)

// Initialize the base configuration with the base SMI middleware
addBaseSMIMiddlewares(c.traefikConfig)
}

return nil
Expand Down Expand Up @@ -242,7 +234,7 @@ func (c *Controller) processNextMessage() bool {
return c.messageQueue.Len() > 0
}

func (c *Controller) buildConfigurationFromProviders() {
func (c *Controller) buildConfigurationFromProviders() *dynamic.Configuration {
// Create all mesh services
if err := c.createMeshServices(); err != nil {
log.Errorf("could not create mesh services: %v", err)
Expand All @@ -259,7 +251,7 @@ func (c *Controller) buildConfigurationFromProviders() {
if err != nil {
log.Errorf("unable to build configuration: %v", err)
}
c.traefikConfig = config
return config
}

func (c *Controller) processCreatedMessage(event message.Message) {
Expand All @@ -277,7 +269,7 @@ func (c *Controller) processCreatedMessage(event message.Message) {
log.Debugf("MeshController ObjectCreated with type: *corev1.Pod: %s/%s", obj.Namespace, obj.Name)
if isMeshPod(obj) {
// Re-Deploy configuration to the created mesh pod.
msg := message.BuildNewConfigWithVersion(c.traefikConfig)
msg := message.BuildNewConfigWithVersion(c.buildConfigurationFromProviders())
// Don't deploy if name or IP are unassigned.
if obj.Name != "" && obj.Status.PodIP != "" {
c.deployer.DeployToPod(obj.Name, obj.Status.PodIP, msg.Config)
Expand All @@ -286,8 +278,7 @@ func (c *Controller) processCreatedMessage(event message.Message) {
return
}

c.buildConfigurationFromProviders()
c.configurationQueue.Add(message.BuildNewConfigWithVersion(c.traefikConfig))
c.configurationQueue.Add(message.BuildNewConfigWithVersion(c.buildConfigurationFromProviders()))
}

func (c *Controller) processUpdatedMessage(event message.Message) {
Expand Down Expand Up @@ -316,7 +307,7 @@ func (c *Controller) processUpdatedMessage(event message.Message) {
log.Debugf("MeshController ObjectUpdated with type: *corev1.Pod: %s/%s", obj.Namespace, obj.Name)
if isMeshPod(obj) {
// Re-Deploy configuration to the updated mesh pod.
msg := message.BuildNewConfigWithVersion(c.traefikConfig)
msg := message.BuildNewConfigWithVersion(c.buildConfigurationFromProviders())
// Don't deploy if name or IP are unassigned.
if obj.Name != "" && obj.Status.PodIP != "" {
c.deployer.DeployToPod(obj.Name, obj.Status.PodIP, msg.Config)
Expand All @@ -326,7 +317,7 @@ func (c *Controller) processUpdatedMessage(event message.Message) {
}

c.buildConfigurationFromProviders()
c.configurationQueue.Add(message.BuildNewConfigWithVersion(c.traefikConfig))
c.configurationQueue.Add(message.BuildNewConfigWithVersion(c.buildConfigurationFromProviders()))
}

func (c *Controller) processDeletedMessage(event message.Message) {
Expand Down Expand Up @@ -357,7 +348,7 @@ func (c *Controller) processDeletedMessage(event message.Message) {
}

c.buildConfigurationFromProviders()
c.configurationQueue.Add(message.BuildNewConfigWithVersion(c.traefikConfig))
c.configurationQueue.Add(message.BuildNewConfigWithVersion(c.buildConfigurationFromProviders()))
}

func (c *Controller) createMeshServices() error {
Expand Down Expand Up @@ -621,13 +612,3 @@ func (c *Controller) saveTCPStateTable() error {
func isMeshPod(pod *corev1.Pod) bool {
return pod.Labels["component"] == "maesh-mesh"
}

func addBaseSMIMiddlewares(config *dynamic.Configuration) {
blockAll := &dynamic.Middleware{
IPWhiteList: &dynamic.IPWhiteList{
SourceRange: []string{"255.255.255.255"},
},
}

config.HTTP.Middlewares[k8s.BlockAllMiddlewareKey] = blockAll
}
5 changes: 5 additions & 0 deletions internal/deployer/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ func (d *Deployer) deployConfiguration(c *dynamic.Configuration) bool {

// DeployToPod takes the configuration, and adds it into the deploy queue for a pod.
func (d *Deployer) DeployToPod(name, ip string, c *dynamic.Configuration) {
if name == "" && ip == "" {
// If there is no name and ip, then just return.
return
}

// Make a copy to deploy, so changes to the main configuration don't propagate
deployConfig := c.DeepCopy()

Expand Down
12 changes: 12 additions & 0 deletions internal/providers/base/base.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package base

import (
"github.com/containous/maesh/internal/k8s"
"github.com/containous/traefik/v2/pkg/config/dynamic"
splitv1alpha1 "github.com/deislabs/smi-sdk-go/pkg/apis/split/v1alpha1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -58,3 +59,14 @@ func GetEndpointsFromList(name, namespace string, endpointList []*corev1.Endpoin

return nil
}

// AddBaseSMIMiddlewares adds base middleware to a dynamic config.
func AddBaseSMIMiddlewares(config *dynamic.Configuration) {
blockAll := &dynamic.Middleware{
IPWhiteList: &dynamic.IPWhiteList{
SourceRange: []string{"255.255.255.255"},
},
}

config.HTTP.Middlewares[k8s.BlockAllMiddlewareKey] = blockAll
}
28 changes: 16 additions & 12 deletions internal/providers/kubernetes/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ func (p *Provider) buildTCPRouter(port int, serviceName string) *dynamic.TCPRout

func (p *Provider) buildService(endpoints *corev1.Endpoints) *dynamic.Service {
var servers []dynamic.Server
for _, subset := range endpoints.Subsets {
for _, endpointPort := range subset.Ports {
for _, address := range subset.Addresses {
server := dynamic.Server{
URL: "http://" + net.JoinHostPort(address.IP, strconv.FormatInt(int64(endpointPort.Port), 10)),
if endpoints.Subsets != nil {
for _, subset := range endpoints.Subsets {
for _, endpointPort := range subset.Ports {
for _, address := range subset.Addresses {
server := dynamic.Server{
URL: "http://" + net.JoinHostPort(address.IP, strconv.FormatInt(int64(endpointPort.Port), 10)),
}
servers = append(servers, server)
}
servers = append(servers, server)
}
}
}
Expand All @@ -93,13 +95,15 @@ func (p *Provider) buildService(endpoints *corev1.Endpoints) *dynamic.Service {

func (p *Provider) buildTCPService(endpoints *corev1.Endpoints) *dynamic.TCPService {
var servers []dynamic.TCPServer
for _, subset := range endpoints.Subsets {
for _, endpointPort := range subset.Ports {
for _, address := range subset.Addresses {
server := dynamic.TCPServer{
Address: net.JoinHostPort(address.IP, strconv.FormatInt(int64(endpointPort.Port), 10)),
if endpoints.Subsets != nil {
for _, subset := range endpoints.Subsets {
for _, endpointPort := range subset.Ports {
for _, address := range subset.Addresses {
server := dynamic.TCPServer{
Address: net.JoinHostPort(address.IP, strconv.FormatInt(int64(endpointPort.Port), 10)),
}
servers = append(servers, server)
}
servers = append(servers, server)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions internal/providers/smi/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func New(client k8s.Client, defaultMode string, meshNamespace string, ignored k8
// from a native kubernetes environment.
func (p *Provider) BuildConfig() (*dynamic.Configuration, error) {
config := base.CreateBaseConfigWithReadiness()
base.AddBaseSMIMiddlewares(config)

services, err := p.client.GetServices(metav1.NamespaceAll)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions internal/providers/smi/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,11 @@ func TestBuildConfiguration(t *testing.T) {
SourceRange: []string{"10.4.3.2"},
},
},
"smi-block-all-middleware": {
IPWhiteList: &dynamic.IPWhiteList{
SourceRange: []string{"255.255.255.255"},
},
},
},
},
TCP: &dynamic.TCPConfiguration{
Expand Down

0 comments on commit 53c8104

Please sign in to comment.