Skip to content

Commit

Permalink
Merge pull request #16039 from pravisankar/fix-route-delay
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue.

Sharded router based on namespace labels should notice routes immediately

- Currently, sharded router based on namespace labels could take 2 resync
  intervals (10 to 15 mins) to notice new routes which may not be acceptable
  to some customers. This change allows routes to work immediately just like
  the non-sharded router behavior.

- Watching project resource may not guarantee the order of the events,
  so there is no behavior change to shared router based on project labels.

Trello card: https://trello.com/c/Q0puUQOT
Rebased on top of #16315
  • Loading branch information
openshift-merge-robot committed Oct 16, 2017
2 parents e86bd01 + c9f43f7 commit fb7cdc9
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 117 deletions.
44 changes: 3 additions & 41 deletions pkg/cmd/infra/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ import (
"github.com/golang/glog"
"github.com/spf13/pflag"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"

cmdutil "github.com/openshift/origin/pkg/cmd/util"
"github.com/openshift/origin/pkg/cmd/util/variable"
Expand Down Expand Up @@ -212,18 +210,18 @@ func (o *RouterSelection) Complete() error {

// NewFactory initializes a factory that will watch the requested routes
func (o *RouterSelection) NewFactory(routeclient routeinternalclientset.Interface, projectclient projectclient.ProjectResourceInterface, kc kclientset.Interface) *controllerfactory.RouterControllerFactory {
factory := controllerfactory.NewDefaultRouterControllerFactory(routeclient, kc)
factory := controllerfactory.NewDefaultRouterControllerFactory(routeclient, projectclient, kc)
factory.LabelSelector = o.LabelSelector
factory.FieldSelector = o.FieldSelector
factory.Namespace = o.Namespace
factory.ResyncInterval = o.ResyncInterval
switch {
case o.NamespaceLabels != nil:
glog.Infof("Router is only using routes in namespaces matching %s", o.NamespaceLabels)
factory.Namespaces = namespaceNames{kc.Core().Namespaces(), o.NamespaceLabels}
factory.NamespaceLabels = o.NamespaceLabels
case o.ProjectLabels != nil:
glog.Infof("Router is only using routes in projects matching %s", o.ProjectLabels)
factory.Namespaces = projectNames{projectclient, o.ProjectLabels}
factory.ProjectLabels = o.ProjectLabels
case len(factory.Namespace) > 0:
glog.Infof("Router is only using resources in namespace %s", factory.Namespace)
default:
Expand All @@ -232,42 +230,6 @@ func (o *RouterSelection) NewFactory(routeclient routeinternalclientset.Interfac
return factory
}

// projectNames returns the names of projects matching the label selector
type projectNames struct {
client projectclient.ProjectResourceInterface
selector labels.Selector
}

func (n projectNames) NamespaceNames() (sets.String, error) {
all, err := n.client.List(metav1.ListOptions{LabelSelector: n.selector.String()})
if err != nil {
return nil, err
}
names := make(sets.String, len(all.Items))
for i := range all.Items {
names.Insert(all.Items[i].Name)
}
return names, nil
}

// namespaceNames returns the names of namespaces matching the label selector
type namespaceNames struct {
client kcoreclient.NamespaceInterface
selector labels.Selector
}

func (n namespaceNames) NamespaceNames() (sets.String, error) {
all, err := n.client.List(metav1.ListOptions{LabelSelector: n.selector.String()})
if err != nil {
return nil, err
}
names := make(sets.String, len(all.Items))
for i := range all.Items {
names.Insert(all.Items[i].Name)
}
return names, nil
}

func envVarAsStrings(name, defaultValue, separator string) []string {
strlist := []string{}
if env := cmdutil.Env(name, defaultValue); env != "" {
Expand Down
84 changes: 65 additions & 19 deletions pkg/router/controller/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ import (
"github.com/golang/glog"

"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
kcache "k8s.io/client-go/tools/cache"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"

projectclient "github.com/openshift/origin/pkg/project/generated/internalclientset/typed/project/internalversion"
routeapi "github.com/openshift/origin/pkg/route/apis/route"
routeclientset "github.com/openshift/origin/pkg/route/generated/internalclientset"
"github.com/openshift/origin/pkg/router"
Expand All @@ -29,23 +32,26 @@ import (
// controller. It supports optional scoping on Namespace, Labels, and Fields of routes.
// If Namespace is empty, it means "all namespaces".
type RouterControllerFactory struct {
KClient kclientset.Interface
RClient routeclientset.Interface
KClient kclientset.Interface
RClient routeclientset.Interface
ProjectClient projectclient.ProjectResourceInterface

Namespaces routercontroller.NamespaceLister
ResyncInterval time.Duration
Namespace string
LabelSelector string
FieldSelector string
ResyncInterval time.Duration
Namespace string
LabelSelector string
FieldSelector string
NamespaceLabels labels.Selector
ProjectLabels labels.Selector

informers map[reflect.Type]kcache.SharedIndexInformer
}

// NewDefaultRouterControllerFactory initializes a default router controller factory.
func NewDefaultRouterControllerFactory(rc routeclientset.Interface, kc kclientset.Interface) *RouterControllerFactory {
func NewDefaultRouterControllerFactory(rc routeclientset.Interface, pc projectclient.ProjectResourceInterface, kc kclientset.Interface) *RouterControllerFactory {
return &RouterControllerFactory{
KClient: kc,
RClient: rc,
ProjectClient: pc,
ResyncInterval: 10 * time.Minute,

Namespace: v1.NamespaceAll,
Expand All @@ -57,17 +63,23 @@ func NewDefaultRouterControllerFactory(rc routeclientset.Interface, kc kclientse
// resources. It spawns child goroutines that cannot be terminated.
func (f *RouterControllerFactory) Create(plugin router.Plugin, watchNodes, enableIngress bool) *routercontroller.RouterController {
rc := &routercontroller.RouterController{
Plugin: plugin,
Namespaces: f.Namespaces,
// check namespaces a bit more often than we resync events, so that we aren't always waiting
Plugin: plugin,
WatchNodes: watchNodes,
EnableIngress: enableIngress,
IngressTranslator: routercontroller.NewIngressTranslator(f.KClient.Core()),

NamespaceLabels: f.NamespaceLabels,
FilteredNamespaceNames: make(sets.String),
NamespaceRoutes: make(map[string]map[string]*routeapi.Route),
NamespaceEndpoints: make(map[string]map[string]*kapi.Endpoints),

ProjectClient: f.ProjectClient,
ProjectLabels: f.ProjectLabels,
// Check projects a bit more often than we resync events, so that we aren't always waiting
// the maximum interval for new items to come into the list
// TODO: trigger a reflector resync after every namespace sync?
NamespaceSyncInterval: f.ResyncInterval - 10*time.Second,
NamespaceWaitInterval: 10 * time.Second,
NamespaceRetries: 5,
WatchNodes: watchNodes,
EnableIngress: enableIngress,
IngressTranslator: routercontroller.NewIngressTranslator(f.KClient.Core()),
ProjectSyncInterval: f.ResyncInterval - 10*time.Second,
ProjectWaitInterval: 10 * time.Second,
ProjectRetries: 5,
}

f.initInformers(rc)
Expand All @@ -77,13 +89,15 @@ func (f *RouterControllerFactory) Create(plugin router.Plugin, watchNodes, enabl
}

func (f *RouterControllerFactory) initInformers(rc *routercontroller.RouterController) {
if f.NamespaceLabels != nil {
f.createNamespacesSharedInformer(rc)
}
f.createEndpointsSharedInformer(rc)
f.createRoutesSharedInformer(rc)

if rc.WatchNodes {
f.createNodesSharedInformer(rc)
}

if rc.EnableIngress {
f.createIngressesSharedInformer(rc)
f.createSecretsSharedInformer(rc)
Expand All @@ -102,6 +116,9 @@ func (f *RouterControllerFactory) initInformers(rc *routercontroller.RouterContr
}

func (f *RouterControllerFactory) registerInformerEventHandlers(rc *routercontroller.RouterController) {
if f.NamespaceLabels != nil {
f.registerSharedInformerEventHandlers(&kapi.Namespace{}, rc.HandleNamespace)
}
f.registerSharedInformerEventHandlers(&kapi.Endpoints{}, rc.HandleEndpoints)
f.registerSharedInformerEventHandlers(&routeapi.Route{}, rc.HandleRoute)

Expand Down Expand Up @@ -135,6 +152,17 @@ func (f *RouterControllerFactory) informerStoreList(obj runtime.Object) []interf
// - Perform first router sync
// - Register informer event handlers for new updates and resyncs
func (f *RouterControllerFactory) processExistingItems(rc *routercontroller.RouterController) {
if f.NamespaceLabels != nil {
items := f.informerStoreList(&kapi.Namespace{})
if len(items) == 0 {
rc.UpdateNamespaces()
} else {
for _, item := range items {
rc.HandleNamespace(watch.Added, item.(*kapi.Namespace))
}
}
}

for _, item := range f.informerStoreList(&kapi.Endpoints{}) {
rc.HandleEndpoints(watch.Added, item.(*kapi.Endpoints))
}
Expand Down Expand Up @@ -255,6 +283,24 @@ func (f *RouterControllerFactory) createSecretsSharedInformer(rc *routercontroll
f.informers[objType] = informer
}

func (f *RouterControllerFactory) createNamespacesSharedInformer(rc *routercontroller.RouterController) {
lw := &kcache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
options.LabelSelector = f.NamespaceLabels.String()
return f.KClient.Core().Namespaces().List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
options.LabelSelector = f.NamespaceLabels.String()
return f.KClient.Core().Namespaces().Watch(options)
},
}
ns := &kapi.Namespace{}
objType := reflect.TypeOf(ns)
indexers := kcache.Indexers{kcache.NamespaceIndex: kcache.MetaNamespaceIndexFunc}
informer := kcache.NewSharedIndexInformer(lw, ns, f.ResyncInterval, indexers)
f.informers[objType] = informer
}

func (f *RouterControllerFactory) registerSharedInformerEventHandlers(obj runtime.Object,
handleFunc func(watch.EventType, interface{})) {
objType := reflect.TypeOf(obj)
Expand Down
Loading

0 comments on commit fb7cdc9

Please sign in to comment.