diff --git a/interceptor/config/serving.go b/interceptor/config/serving.go index 055ee4c9..10068ef4 100644 --- a/interceptor/config/serving.go +++ b/interceptor/config/serving.go @@ -1,6 +1,8 @@ package config import ( + "time" + "github.com/kelseyhightower/envconfig" ) @@ -16,15 +18,9 @@ type Serving struct { // This is the server that the external scaler will issue metrics // requests to AdminPort int `envconfig:"KEDA_HTTP_ADMIN_PORT" required:"true"` - // RoutingTableUpdateDurationMS is the interval (in milliseconds) representing how - // often to do a complete update of the routing table ConfigMap. - // - // The interceptor will also open a watch stream to the routing table - // ConfigMap and attempt to update the routing table on every update. - // - // Since it does full updates alongside watch stream updates, it can - // only process one at a time. Therefore, this is a best effort timeout - RoutingTableUpdateDurationMS int `envconfig:"KEDA_HTTP_ROUTING_TABLE_UPDATE_DURATION_MS" default:"500"` + // ConfigMapCacheRsyncPeriod is the time interval + // for the configmap informer to rsync the local cache. + ConfigMapCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_CONFIG_MAP_INFORMER_RSYNC_PERIOD" default:"60m"` // The interceptor has an internal process that periodically fetches the state // of deployment that is running the servers it forwards to. // diff --git a/interceptor/main.go b/interceptor/main.go index 9b744e49..7e877a2c 100644 --- a/interceptor/main.go +++ b/interceptor/main.go @@ -73,6 +73,15 @@ func main() { q := queue.NewMemory() routingTable := routing.NewTable() + // Create the informer of ConfigMap resource, + // the resynchronization period of the informer should be not less than 1s, + // refer to: https://github.com/kubernetes/client-go/blob/v0.22.2/tools/cache/shared_informer.go#L475 + configMapInformer := k8s.NewInformerConfigMapUpdater( + lggr, + cl, + servingCfg.ConfigMapCacheRsyncPeriod, + ) + lggr.Info( "Fetching initial routing table", ) @@ -109,10 +118,11 @@ func main() { err := routing.StartConfigMapRoutingTableUpdater( ctx, lggr, - time.Duration(servingCfg.RoutingTableUpdateDurationMS)*time.Millisecond, - configMapsInterface, + configMapInformer, + servingCfg.CurrentNamespace, routingTable, q, + nil, ) lggr.Error(err, "config map routing table updater failed") return err diff --git a/pkg/k8s/config_map_cache_informer.go b/pkg/k8s/config_map_cache_informer.go new file mode 100644 index 00000000..bb53e388 --- /dev/null +++ b/pkg/k8s/config_map_cache_informer.go @@ -0,0 +1,132 @@ +package k8s + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/go-logr/logr" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/informers" + infcorev1 "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +type InformerConfigMapUpdater struct { + lggr logr.Logger + cmInformer infcorev1.ConfigMapInformer + bcaster *watch.Broadcaster +} + +func (i *InformerConfigMapUpdater) MarshalJSON() ([]byte, error) { + lst := i.cmInformer.Lister() + cms, err := lst.List(labels.Everything()) + if err != nil { + return nil, err + } + return json.Marshal(&cms) +} + +func (i *InformerConfigMapUpdater) Start(ctx context.Context) error { + i.cmInformer.Informer().Run(ctx.Done()) + return errors.Wrap( + ctx.Err(), + "configMap informer was stopped", + ) +} + +func (i *InformerConfigMapUpdater) Get( + ns, + name string, +) (corev1.ConfigMap, error) { + cm, err := i.cmInformer.Lister().ConfigMaps(ns).Get(name) + if err != nil { + return corev1.ConfigMap{}, err + } + return *cm, nil +} + +func (i *InformerConfigMapUpdater) Watch( + ns, + name string, +) watch.Interface { + return watch.Filter(i.bcaster.Watch(), func(e watch.Event) (watch.Event, bool) { + cm, ok := e.Object.(*corev1.ConfigMap) + if !ok { + i.lggr.Error( + fmt.Errorf("informer expected ConfigMap, ignoring this event"), + "event", + e, + ) + return e, false + } + if cm.Namespace == ns && cm.Name == name { + return e, true + } + return e, false + }) +} + +func (i *InformerConfigMapUpdater) addEvtHandler(obj interface{}) { + cm, ok := obj.(*corev1.ConfigMap) + if !ok { + i.lggr.Error( + fmt.Errorf("informer expected configMap, got %v", obj), + "not forwarding event", + ) + return + } + i.bcaster.Action(watch.Added, cm) +} + +func (i *InformerConfigMapUpdater) updateEvtHandler(oldObj, newObj interface{}) { + cm, ok := newObj.(*corev1.ConfigMap) + if !ok { + i.lggr.Error( + fmt.Errorf("informer expected configMap, got %v", newObj), + "not forwarding event", + ) + return + } + i.bcaster.Action(watch.Modified, cm) +} + +func (i *InformerConfigMapUpdater) deleteEvtHandler(obj interface{}) { + cm, ok := obj.(*corev1.ConfigMap) + if !ok { + i.lggr.Error( + fmt.Errorf("informer expected configMap, got %v", obj), + "not forwarding event", + ) + return + } + i.bcaster.Action(watch.Deleted, cm) +} + +func NewInformerConfigMapUpdater( + lggr logr.Logger, + cl kubernetes.Interface, + defaultResync time.Duration, +) *InformerConfigMapUpdater { + factory := informers.NewSharedInformerFactory( + cl, + defaultResync, + ) + cmInformer := factory.Core().V1().ConfigMaps() + ret := &InformerConfigMapUpdater{ + lggr: lggr, + bcaster: watch.NewBroadcaster(0, watch.WaitIfChannelFull), + cmInformer: cmInformer, + } + ret.cmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: ret.addEvtHandler, + UpdateFunc: ret.updateEvtHandler, + DeleteFunc: ret.deleteEvtHandler, + }) + return ret +} diff --git a/pkg/routing/config_map_updater.go b/pkg/routing/config_map_updater.go index 67a58f33..a1b2f723 100644 --- a/pkg/routing/config_map_updater.go +++ b/pkg/routing/config_map_updater.go @@ -2,15 +2,13 @@ package routing import ( "context" - "time" "github.com/go-logr/logr" "github.com/kedacore/http-add-on/pkg/k8s" "github.com/kedacore/http-add-on/pkg/queue" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" ) // StartConfigMapRoutingTableUpdater starts a loop that does the following: @@ -21,49 +19,42 @@ import ( // called ConfigMapRoutingTableName. On either of those events, decodes // that ConfigMap into a routing table and stores the new table into table // using table.Replace(newTable) +// - Execute the callback function, if one exists // - Returns an appropriate non-nil error if ctx.Done() receives func StartConfigMapRoutingTableUpdater( ctx context.Context, lggr logr.Logger, - updateEvery time.Duration, - getterWatcher k8s.ConfigMapGetterWatcher, + cmInformer *k8s.InformerConfigMapUpdater, + ns string, table *Table, q queue.Counter, + cbFunc func() error, ) error { lggr = lggr.WithName("pkg.routing.StartConfigMapRoutingTableUpdater") - watchIface, err := getterWatcher.Watch(ctx, metav1.ListOptions{}) - if err != nil { - return err - } - defer watchIface.Stop() - ticker := time.NewTicker(updateEvery) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return errors.Wrap(ctx.Err(), "context is done") - case <-ticker.C: - if err := GetTable(ctx, lggr, getterWatcher, table, q); err != nil { - return errors.Wrap(err, "failed to fetch routing table") - } + watcher := cmInformer.Watch(ns, ConfigMapRoutingTableName) + defer watcher.Stop() - case evt := <-watchIface.ResultChan(): - evtType := evt.Type - obj := evt.Object - if evtType == watch.Added || evtType == watch.Modified { - cm, ok := obj.(*corev1.ConfigMap) - // by definition of watchIface, all returned objects should - // be assertable to a ConfigMap. In the likely impossible - // case that it isn't, just ignore and move on. - // This check is just to be defensive. + ctx, done := context.WithCancel(ctx) + defer done() + grp, ctx := errgroup.WithContext(ctx) + + grp.Go(func() error { + defer done() + return cmInformer.Start(ctx) + }) + + grp.Go(func() error { + defer done() + for { + select { + case event := <-watcher.ResultChan(): + cm, ok := event.Object.(*corev1.ConfigMap) + // Theoretically this will not happen if !ok { - continue - } - // the watcher is open on all ConfigMaps in the namespace, so - // bail out of this loop iteration immediately if the event - // isn't for the routing table ConfigMap. - if cm.Name != ConfigMapRoutingTableName { + lggr.Info( + "The event object observed is not a configmap", + ) continue } newTable, err := FetchTableFromConfigMap(cm, q) @@ -81,8 +72,25 @@ func StartConfigMapRoutingTableUpdater( ) continue } + // Execute the callback function, if one exists + if cbFunc != nil { + if err := cbFunc(); err != nil { + lggr.Error( + err, + "failed to exec the callback function", + ) + continue + } + } + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "context is done") } } - } + }) + if err := grp.Wait(); err != nil { + lggr.Error(err, "config map routing updater is failed") + return err + } + return nil } diff --git a/pkg/routing/config_map_updater_test.go b/pkg/routing/config_map_updater_test.go index 78c10fa5..43a9096f 100644 --- a/pkg/routing/config_map_updater_test.go +++ b/pkg/routing/config_map_updater_test.go @@ -3,6 +3,7 @@ package routing import ( "context" "errors" + "strings" "testing" "time" @@ -41,10 +42,6 @@ import ( // approach. The fake watcher documentation is linked below: // // (https://pkg.go.dev/k8s.io/apimachinery@v0.21.3/pkg/watch#NewFake), -type fakeCMGetterWatcher struct { - k8s.ConfigMapGetter - k8s.ConfigMapWatcher -} type fakeConfigMapWatcher struct { watchIface watch.Interface @@ -88,15 +85,13 @@ func TestStartUpdateLoop(t *testing.T) { } r.NoError(SaveTableToConfigMap(table, cm)) - fakeWatcher := watch.NewFake() fakeGetter := fake.NewSimpleClientset(cm) - getterWatcher := fakeCMGetterWatcher{ - ConfigMapGetter: fakeGetter. - CoreV1(). - ConfigMaps(ns), - ConfigMapWatcher: fakeConfigMapWatcher{fakeWatcher}, - } - defer fakeWatcher.Stop() + + configMapInformer := k8s.NewInformerConfigMapUpdater( + lggr, + fakeGetter, + time.Second*1, + ) grp, ctx := errgroup.WithContext(ctx) @@ -104,10 +99,11 @@ func TestStartUpdateLoop(t *testing.T) { err := StartConfigMapRoutingTableUpdater( ctx, lggr, - interval, - getterWatcher, + configMapInformer, + ns, table, q, + nil, ) // we purposefully cancel the context below, // so we need to ignore that error. @@ -120,7 +116,26 @@ func TestStartUpdateLoop(t *testing.T) { // send a watch event in parallel. we'll ensure that it // made it through in the below loop grp.Go(func() error { - fakeWatcher.Add(cm) + if _, err := fakeGetter. + CoreV1(). + ConfigMaps(ns). + Create(ctx, cm, metav1.CreateOptions{}); err != nil && strings.Contains( + err.Error(), + "already exists", + ) { + if err := fakeGetter. + CoreV1(). + ConfigMaps(ns). + Delete(ctx, cm.Name, metav1.DeleteOptions{}); err != nil { + return err + } + if _, err := fakeGetter. + CoreV1(). + ConfigMaps(ns). + Create(ctx, cm, metav1.CreateOptions{}); err != nil { + return err + } + } return nil }) @@ -129,6 +144,12 @@ func TestStartUpdateLoop(t *testing.T) { const waitDur = interval * 5 time.Sleep(waitDur) + _, err := fakeGetter. + CoreV1(). + ConfigMaps(ns). + Get(ctx, ConfigMapRoutingTableName, metav1.GetOptions{}) + r.NoError(err) + for _, action := range fakeGetter.Actions() { verb := action.GetVerb() resource := action.GetResource().Resource diff --git a/scaler/config.go b/scaler/config.go index d770a0d1..3a09d34e 100644 --- a/scaler/config.go +++ b/scaler/config.go @@ -26,9 +26,9 @@ type config struct { // KEDA, if that value is not set on an incoming // `HTTPScaledObject` TargetPendingRequests int `envconfig:"KEDA_HTTP_SCALER_TARGET_PENDING_REQUESTS" default:"100"` - // UpdateRoutingTableDur is the duration between manual - // updates to the routing table. - UpdateRoutingTableDur time.Duration `envconfig:"KEDA_HTTP_SCALER_ROUTING_TABLE_UPDATE_DUR" default:"100ms"` + // ConfigMapCacheRsyncPeriod is the time interval + // for the configmap informer to rsync the local cache. + ConfigMapCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_CONFIG_MAP_INFORMER_RSYNC_PERIOD" default:"60m"` // QueueTickDuration is the duration between queue requests QueueTickDuration time.Duration `envconfig:"KEDA_HTTP_QUEUE_TICK_DURATION" default:"500ms"` // This will be the 'Target Pending Requests' for the interceptor diff --git a/scaler/main.go b/scaler/main.go index 83fe89e3..7f0c2c29 100644 --- a/scaler/main.go +++ b/scaler/main.go @@ -62,8 +62,27 @@ func main() { os.Exit(1) } + // This callback function is used to fetch and save + // the current queue counts from the interceptor immediately + // after updating the routingTable information. + callbackWhenRoutingTableUpdate := func() error { + if err := pinger.fetchAndSaveCounts(ctx); err != nil { + return err + } + return nil + } + table := routing.NewTable() + // Create the informer of ConfigMap resource, + // the resynchronization period of the informer should be not less than 1s, + // refer to: https://github.com/kubernetes/client-go/blob/v0.22.2/tools/cache/shared_informer.go#L475 + configMapInformer := k8s.NewInformerConfigMapUpdater( + lggr, + k8sCl, + cfg.ConfigMapCacheRsyncPeriod, + ) + grp, ctx := errgroup.WithContext(ctx) grp.Go(func() error { @@ -92,16 +111,18 @@ func main() { return routing.StartConfigMapRoutingTableUpdater( ctx, lggr, - cfg.UpdateRoutingTableDur, - k8sCl.CoreV1().ConfigMaps(cfg.TargetNamespace), + configMapInformer, + cfg.TargetNamespace, table, // we don't care about the queue here. // we just want to update the routing table // so that the scaler can use it to determine // the target metrics for given hosts. queue.NewMemory(), + callbackWhenRoutingTableUpdate, ) }) + grp.Go(func() error { defer done() return startHealthcheckServer( diff --git a/scaler/main_test.go b/scaler/main_test.go index 5621fec7..43b2d854 100644 --- a/scaler/main_test.go +++ b/scaler/main_test.go @@ -29,7 +29,6 @@ func TestHealthChecks(t *testing.T) { TargetService: "testsvc", TargetPort: port + 123, TargetPendingRequests: 100, - UpdateRoutingTableDur: 100 * time.Millisecond, } errgrp, ctx := errgroup.WithContext(ctx)