Skip to content

Commit

Permalink
Deploy configuration to mesh pods even if config hasn't changed
Browse files Browse the repository at this point in the history
  • Loading branch information
dtomcej authored and traefiker committed Oct 17, 2019
1 parent aa9496b commit a5e73c1
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 38 deletions.
35 changes: 5 additions & 30 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,11 @@ import (
type Controller struct {
clients *k8s.ClientWrapper
kubernetesFactory informers.SharedInformerFactory
meshFactory informers.SharedInformerFactory
smiAccessFactory smiAccessExternalversions.SharedInformerFactory
smiSpecsFactory smiSpecsExternalversions.SharedInformerFactory
smiSplitFactory smiSplitExternalversions.SharedInformerFactory
handler *Handler
meshHandler *Handler
configRefreshChan chan bool
configRefreshChan chan string
provider base.Provider
ignored k8s.IgnoreWrapper
smiEnabled bool
Expand All @@ -52,18 +50,13 @@ type Controller struct {
// and return an initialized mesh controller object.
func NewMeshController(clients *k8s.ClientWrapper, smiEnabled bool, defaultMode string, meshNamespace string, ignoreNamespaces []string) *Controller {
ignored := k8s.NewIgnored(meshNamespace, ignoreNamespaces)

// configRefreshChan is used to trigger configuration refreshes and deploys.
configRefreshChan := make(chan bool)

configRefreshChan := make(chan string)
handler := NewHandler(ignored, configRefreshChan)
// Create a new mesh handler to handle mesh events (pods)
meshHandler := NewHandler(ignored.WithoutMesh(), configRefreshChan)

c := &Controller{
clients: clients,
handler: handler,
meshHandler: meshHandler,
configRefreshChan: configRefreshChan,
ignored: ignored,
smiEnabled: smiEnabled,
Expand All @@ -82,22 +75,12 @@ func NewMeshController(clients *k8s.ClientWrapper, smiEnabled bool, defaultMode
func (c *Controller) Init() error {
// Register handler funcs to controller funcs.
c.handler.RegisterMeshHandlers(c.createMeshService, c.updateMeshService, c.deleteMeshService)
c.meshHandler.RegisterMeshHandlers(c.createMeshService, c.updateMeshService, c.deleteMeshService)

// Create a new SharedInformerFactory, and register the event handler to informers.
c.kubernetesFactory = informers.NewSharedInformerFactoryWithOptions(c.clients.KubeClient, k8s.ResyncPeriod)
c.kubernetesFactory.Core().V1().Services().Informer().AddEventHandler(c.handler)
c.kubernetesFactory.Core().V1().Endpoints().Informer().AddEventHandler(c.handler)

// Create a new SharedInformerFactory, and register the event handler to informers.
c.meshFactory = informers.NewSharedInformerFactoryWithOptions(c.clients.KubeClient,
k8s.ResyncPeriod,
informers.WithNamespace(c.meshNamespace),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = "component==maesh-mesh"
}),
)
c.meshFactory.Core().V1().Pods().Informer().AddEventHandler(c.meshHandler)
c.kubernetesFactory.Core().V1().Pods().Informer().AddEventHandler(c.handler)

c.tcpStateTable = &k8s.State{Table: make(map[int]*k8s.ServiceWithPort)}

Expand Down Expand Up @@ -151,14 +134,14 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
case <-stopCh:
log.Info("Shutting down workers")
return nil
case <-c.configRefreshChan:
case message := <-c.configRefreshChan:
// Reload the configuration
conf, err := c.provider.BuildConfig()
if err != nil {
return err
}

if !reflect.DeepEqual(c.lastConfiguration.Get(), conf) {
if message == k8s.ConfigMessageChanForce || !reflect.DeepEqual(c.lastConfiguration.Get(), conf) {
c.lastConfiguration.Set(conf)

if deployErr := c.deployConfiguration(conf); deployErr != nil {
Expand All @@ -180,14 +163,6 @@ func (c *Controller) startInformers(stopCh <-chan struct{}) {
}
}

c.meshFactory.Start(stopCh)

for t, ok := range c.meshFactory.WaitForCacheSync(stopCh) {
if !ok {
log.Errorf("timed out waiting for controller caches to sync: %s", t.String())
}
}

if c.smiEnabled {
c.smiAccessFactory.Start(stopCh)

Expand Down
21 changes: 13 additions & 8 deletions internal/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
// Handler is an implementation of a ResourceEventHandler.
type Handler struct {
ignored k8s.IgnoreWrapper
configRefreshChan chan bool
configRefreshChan chan string
createMeshServiceFunc func(service *corev1.Service) error
updateMeshServiceFunc func(oldUserService *corev1.Service, newUserService *corev1.Service) (*corev1.Service, error)
deleteMeshServiceFunc func(serviceName, serviceNamespace string) error
}

// NewHandler creates a handler.
func NewHandler(ignored k8s.IgnoreWrapper, configRefreshChan chan bool) *Handler {
func NewHandler(ignored k8s.IgnoreWrapper, configRefreshChan chan string) *Handler {
h := &Handler{
ignored: ignored,
configRefreshChan: configRefreshChan,
Expand Down Expand Up @@ -63,8 +63,8 @@ func (h *Handler) OnAdd(obj interface{}) {
}
}

// Trigger a configuration refresh.
h.configRefreshChan <- true
// Trigger a configuration rebuild.
h.configRefreshChan <- k8s.ConfigMessageChanRebuild
}

// OnUpdate executed when an object is updated.
Expand All @@ -90,14 +90,19 @@ func (h *Handler) OnUpdate(oldObj, newObj interface{}) {
log.Debugf("MeshControllerHandler ObjectUpdated with type: *corev1.Endpoints: %s/%s", obj.Namespace, obj.Name)
case *corev1.Pod:
if !isMeshPod(obj) {
// We don't track updates of user pods, updates are done through endpoints.
return
}

log.Debugf("MeshControllerHandler ObjectUpdated with type: *corev1.Pod: %s/%s", obj.Namespace, obj.Name)
// Since this is a mesh pod update, trigger a force deploy.
h.configRefreshChan <- k8s.ConfigMessageChanForce

return
}

// Trigger a configuration refresh.
h.configRefreshChan <- true
// Trigger a configuration rebuild.
h.configRefreshChan <- k8s.ConfigMessageChanRebuild
}

// OnDelete executed when an object is deleted.
Expand All @@ -124,6 +129,6 @@ func (h *Handler) OnDelete(obj interface{}) {
return
}

// Trigger a configuration refresh.
h.configRefreshChan <- true
// Trigger a configuration rebuild.
h.configRefreshChan <- k8s.ConfigMessageChanRebuild
}
5 changes: 5 additions & 0 deletions internal/k8s/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@ const (

// TCPStateConfigMapName TCP config map name.
TCPStateConfigMapName string = "tcp-state-table"

// ConfigMessageChanRebuild rebuild.
ConfigMessageChanRebuild string = "rebuild"
// ConfigMessageChanForce force.
ConfigMessageChanForce string = "force"
)

0 comments on commit a5e73c1

Please sign in to comment.