Skip to content

Commit

Permalink
use a single informer for resolver (#71)
Browse files Browse the repository at this point in the history
* use a single informer for resolver
  • Loading branch information
Maanas-23 authored Jan 27, 2025
1 parent b629587 commit ddb5d2d
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 58 deletions.
5 changes: 5 additions & 0 deletions operator/internal/controller/elastiservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type (
const (

// These are resolver details, ideally in future we can move this to a configmap, or find a better way to serve this
// TODO: Move this to configmap
resolverNamespace = "elasti"
resolverDeploymentName = "elasti-resolver"
resolverServiceName = "elasti-resolver-service"
Expand Down Expand Up @@ -120,6 +121,7 @@ func (r *ElastiServiceReconciler) Reconcile(ctx context.Context, req ctrl.Reques
crddirectory.AddCRD(svcNamespacedName.String(), &crddirectory.CRDDetails{
CRDName: es.Name,
Spec: es.Spec,
Status: es.Status,
})
r.Logger.Info("CRD added to service directory", zap.String("es", req.String()), zap.String("service", es.Spec.Service))
return res, nil
Expand All @@ -144,6 +146,9 @@ func (r *ElastiServiceReconciler) Initialize(ctx context.Context) error {
if err := r.reconcileExistingCRDs(ctx); err != nil {
return fmt.Errorf("failed to reconcile existing CRDs: %w", err)
}
if err := r.InformerManager.InitializeResolverInformer(r.getResolverChangeHandler(ctx)); err != nil {
return fmt.Errorf("failed to initialize resolver informer: %w", err)
}
return nil
}

Expand Down
48 changes: 36 additions & 12 deletions operator/internal/controller/opsDeployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package controller
import (
"context"
"fmt"

"strings"
"truefoundry/elasti/operator/api/v1alpha1"
"truefoundry/elasti/operator/internal/crddirectory"

"github.com/truefoundry/elasti/pkg/k8shelper"
"github.com/truefoundry/elasti/pkg/values"
Expand Down Expand Up @@ -36,24 +37,47 @@ func (r *ElastiServiceReconciler) handleTargetDeploymentChanges(ctx context.Cont
return nil
}

func (r *ElastiServiceReconciler) handleResolverChanges(ctx context.Context, obj interface{}, serviceName, namespace string) error {
func (r *ElastiServiceReconciler) handleResolverChanges(ctx context.Context, obj interface{}) error {
resolverDeployment := &appsv1.Deployment{}
err := k8shelper.UnstructuredToResource(obj, resolverDeployment)
if err != nil {
return fmt.Errorf("failed to convert unstructured to deployment: %w", err)
}
if resolverDeployment.Name == resolverDeploymentName {
targetNamespacedName := types.NamespacedName{
Name: serviceName,
Namespace: namespace,
if resolverDeployment.Name != resolverDeploymentName {
return nil
}

crddirectory.CRDDirectory.Services.Range(func(key, value interface{}) bool {
crdDetails := value.(*crddirectory.CRDDetails)
if crdDetails.Status.Mode != values.ProxyMode {
return true
}
targetSVC := &v1.Service{}
if err := r.Get(ctx, targetNamespacedName, targetSVC); err != nil {
return fmt.Errorf("failed to get service to update endpointslice: %w", err)

// Extract namespace and service name from the key
keyStr := key.(string)
parts := strings.Split(keyStr, "/")
if len(parts) != 2 {
r.Logger.Error("Invalid key format", zap.String("key", keyStr))
return true
}
if err := r.createOrUpdateEndpointsliceToResolver(ctx, targetSVC); err != nil {
return fmt.Errorf("failed to create or update endpointslice to resolver: %w", err)
namespacedName := types.NamespacedName{
Namespace: parts[0],
Name: parts[1],
}
}

targetService := &v1.Service{}
if err := r.Get(ctx, namespacedName, targetService); err != nil {
r.Logger.Warn("Failed to get service to update EndpointSlice", zap.Error(err))
return true
}

if err := r.createOrUpdateEndpointsliceToResolver(ctx, targetService); err != nil {
r.Logger.Error("Failed to update EndpointSlice",
zap.String("service", crdDetails.CRDName),
zap.Error(err))
}
return true
})

return nil
}
25 changes: 4 additions & 21 deletions operator/internal/controller/opsInformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,36 +38,19 @@ func (r *ElastiServiceReconciler) getMutexKeyForPublicSVC(req ctrl.Request) stri
func (r *ElastiServiceReconciler) getMutexKeyForTargetRef(req ctrl.Request) string {
return req.String() + lockKeyPostfixForTargetRef
}

func (r *ElastiServiceReconciler) getResolverChangeHandler(ctx context.Context, es *v1alpha1.ElastiService, req ctrl.Request) cache.ResourceEventHandlerFuncs {
key := r.InformerManager.GetKey(informer.KeyParams{
Namespace: resolverNamespace,
CRDName: req.Name,
ResourceName: resolverDeploymentName,
Resource: values.KindDeployments,
})
func (r *ElastiServiceReconciler) getResolverChangeHandler(ctx context.Context) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
errStr := values.Success
err := r.handleResolverChanges(ctx, obj, es.Spec.Service, req.Namespace)
err := r.handleResolverChanges(ctx, obj)
if err != nil {
errStr = err.Error()
r.Logger.Error("Failed to handle resolver changes", zap.Error(err))
} else {
r.Logger.Info("Resolver deployment added", zap.String("deployment_name", resolverDeploymentName), zap.String("es", req.String()))
}
prom.InformerHandlerCounter.WithLabelValues(req.String(), key, errStr).Inc()
},
UpdateFunc: func(_, newObj interface{}) {
errStr := values.Success
err := r.handleResolverChanges(ctx, newObj, es.Spec.Service, req.Namespace)
err := r.handleResolverChanges(ctx, newObj)
if err != nil {
errStr = err.Error()
r.Logger.Error("Failed to handle resolver changes", zap.Error(err))
} else {
r.Logger.Info("Resolver deployment updated", zap.String("deployment_name", resolverDeploymentName), zap.String("es", req.String()))
}
prom.InformerHandlerCounter.WithLabelValues(req.String(), key, errStr).Inc()
},
DeleteFunc: func(_ interface{}) {
// TODO: Handle deletion of resolver deployment
Expand All @@ -78,7 +61,7 @@ func (r *ElastiServiceReconciler) getResolverChangeHandler(ctx context.Context,
//
// Another situation is, if the resolver has some issues, and is restarting.
// In that case, we can wait for the resolver to come back up, and in the meanwhile, we can move to the serve mode
r.Logger.Debug("Resolver deployment deleted", zap.String("deployment_name", resolverDeploymentName), zap.String("es", req.String()))
r.Logger.Warn("Resolver deployment deleted", zap.String("deployment_name", resolverDeploymentName))
},
}
}
Expand Down
28 changes: 3 additions & 25 deletions operator/internal/controller/opsModes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"sync"

"truefoundry/elasti/operator/api/v1alpha1"
"truefoundry/elasti/operator/internal/informer"

"github.com/truefoundry/elasti/pkg/values"
"go.uber.org/zap"
Expand Down Expand Up @@ -39,7 +38,7 @@ func (r *ElastiServiceReconciler) switchMode(ctx context.Context, req ctrl.Reque
defer r.updateCRDStatus(ctx, req.NamespacedName, mode)
switch mode {
case values.ServeMode:
if err = r.enableServeMode(ctx, req, es); err != nil {
if err = r.enableServeMode(ctx, es); err != nil {
r.Logger.Error("Failed to enable SERVE mode", zap.String("es", req.NamespacedName.String()), zap.Error(err))
return err
}
Expand Down Expand Up @@ -82,38 +81,17 @@ func (r *ElastiServiceReconciler) enableProxyMode(ctx context.Context, req ctrl.
}
r.Logger.Info("3. Created or updated endpointslice to resolver", zap.String("service", targetSVC.Name))

// Watch for changes in resolver deployment, and update the endpointslice since we are in proxy mode
if err := r.InformerManager.WatchDeployment(req, resolverDeploymentName, resolverNamespace, r.getResolverChangeHandler(ctx, es, req)); err != nil {
return fmt.Errorf("failed to add watch on resolver deployment: %w", err)
}
r.Logger.Info("4. Added watch on resolver deployment", zap.String("deployment", resolverDeploymentName))

return nil
}

func (r *ElastiServiceReconciler) enableServeMode(ctx context.Context, req ctrl.Request, es *v1alpha1.ElastiService) error {
// TODO: Why are we stopping the watch on resolver deployment if a service moves to serve mode?
// Seems we are creating multiple informers for the resolver deployment when only one would suffice
// Stop the watch on resolver deployment, since we are in serve mode
key := r.InformerManager.GetKey(informer.KeyParams{
Namespace: resolverNamespace,
CRDName: req.Name,
ResourceName: resolverDeploymentName,
Resource: values.KindDeployments,
})
err := r.InformerManager.StopInformer(key)
if err != nil {
r.Logger.Error("Failed to stop watch on resolver deployment", zap.String("deployment", resolverDeploymentName), zap.Error(err))
}
r.Logger.Info("1. Stopped watch on resolver deployment", zap.String("deployment", resolverDeploymentName))

func (r *ElastiServiceReconciler) enableServeMode(ctx context.Context, es *v1alpha1.ElastiService) error {
targetNamespacedName := types.NamespacedName{
Name: es.Spec.Service,
Namespace: es.Namespace,
}
if err := r.deleteEndpointsliceToResolver(ctx, targetNamespacedName); err != nil {
return fmt.Errorf("failed to delete endpointslice to resolver: %w", err)
}
r.Logger.Info("2. Deleted endpointslice to resolver", zap.String("service", targetNamespacedName.String()))
r.Logger.Info("1. Deleted endpointslice to resolver", zap.String("service", targetNamespacedName.String()))
return nil
}
1 change: 1 addition & 0 deletions operator/internal/crddirectory/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Directory struct {
type CRDDetails struct {
CRDName string
Spec v1alpha1.ElastiServiceSpec
Status v1alpha1.ElastiServiceStatus
}

var CRDDirectory *Directory
Expand Down
50 changes: 50 additions & 0 deletions operator/internal/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,22 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
)

const (
// TODO: Move to configMap
resolverNamespace = "elasti"
resolverDeploymentName = "elasti-resolver"
resolverServiceName = "elasti-resolver-service"
resolverPort = 8012
)

type (
// Manager helps manage lifecycle of informer
Manager struct {
client *kubernetes.Clientset
dynamicClient *dynamic.DynamicClient
logger *zap.Logger
informers sync.Map
resolver info
resyncPeriod time.Duration
healthCheckDuration time.Duration
healthCheckStopChan chan struct{}
Expand Down Expand Up @@ -78,6 +87,47 @@ func NewInformerManager(logger *zap.Logger, kConfig *rest.Config) *Manager {
}
}

func (m *Manager) InitializeResolverInformer(handlers cache.ResourceEventHandlerFuncs) error {
deploymentGVR := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
}

m.resolver.Informer = cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: func(_ metav1.ListOptions) (kRuntime.Object, error) {
return m.dynamicClient.Resource(deploymentGVR).Namespace(resolverNamespace).List(context.Background(), metav1.ListOptions{
FieldSelector: "metadata.name=" + resolverDeploymentName,
})
},
WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) {
return m.dynamicClient.Resource(deploymentGVR).Namespace(resolverNamespace).Watch(context.Background(), metav1.ListOptions{
FieldSelector: "metadata.name=" + resolverDeploymentName,
})
},
},
&unstructured.Unstructured{},
m.resyncPeriod,
)

_, err := m.resolver.Informer.AddEventHandler(handlers)
if err != nil {
m.logger.Error("Failed to add event handler", zap.Error(err))
return fmt.Errorf("failed to add event handler: %w", err)
}

m.resolver.StopCh = make(chan struct{})
go m.resolver.Informer.Run(m.resolver.StopCh)

if !cache.WaitForCacheSync(m.resolver.StopCh, m.resolver.Informer.HasSynced) {
m.logger.Error("Failed to sync informer", zap.String("key", m.getKeyFromRequestWatch(m.resolver.Req)))
return errors.New("failed to sync resolver informer")
}
m.logger.Info("Resolver informer started")
return nil
}

// Start is to initiate a health check on all the running informers
// It uses HasSynced if a informer is not synced, if not, it restarts it
func (m *Manager) Start() {
Expand Down

0 comments on commit ddb5d2d

Please sign in to comment.