Skip to content

Commit

Permalink
test: test fixes | timeout updated | config moved to env
Browse files Browse the repository at this point in the history
  • Loading branch information
ramantehlan committed Jul 10, 2024
1 parent 2aaceb3 commit b16970f
Show file tree
Hide file tree
Showing 27 changed files with 480 additions and 197 deletions.
2 changes: 0 additions & 2 deletions demo-elastiService.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ metadata:
name: emotion-class-elasti-service
namespace: raman-ws
spec:
queueTimeout: 4
idlePeriod: 20
minTargetReplicas: 2
service: emotion-class-svc
scaleTargetRef:
Expand Down
23 changes: 23 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Internal Documentatioon
TBA


# File Structure
```
.
├── LICENSE
├── Makefile
├── README.md
├── demo-elastiService.yaml
├── docs
├── go.work
├── go.work.sum
├── install.yaml
├── kustomization.yaml
├── operator
├── pkg
├── playground
├── resolver
└── tests
```
22 changes: 20 additions & 2 deletions install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,26 @@ spec:
spec:
containers:
- env:
- name: SYSTEM_NAMESPACE
value: elasti
- name: MAX_IDLE_PROXY_CONNS
value: "10"
- name: MAX_IDLE_PROXY_CONNS_PER_HOST
value: "100"
- name: REQ_TIMEOUT
value: "120"
- name: TRAFFIC_RE_ENABLE_DURATION
value: "10"
- name: OPERATOR_RETRY_DURATION
value: "10"
- name: HEADER_FOR_HOST
value: X-Envoy-Decorator-Operation
- name: QUEUE_SIZE
value: "50"
- name: QUEUE_RETRY_DURATION
value: "2"
- name: MAX_QUEUE_CONCURRENCY
value: "5"
- name: INITIAL_CAPACITY
value: "10"
image: ramantehlan/elasti-resolver:v1alpha1
imagePullPolicy: Always
name: playground
Expand Down
8 changes: 4 additions & 4 deletions operator/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ PLATFORMS ?= linux/amd64 #,linux/s390x,linux/ppc64le,linux/arm64,
.PHONY: docker-buildx
docker-buildx: ## Build and push docker image for the manager for cross-platform support
# copy existing Dockerfile and insert --platform=${BUILDPLATFORM} into Dockerfile.cross, and preserve the original Dockerfile
sed -e '1 s/\(^FROM\)/FROM --platform=\$$\{BUILDPLATFORM\}/; t' -e ' 1,// s//FROM --platform=\$$\{BUILDPLATFORM\}/' Dockerfile > Dockerfile.cross
#sed -e '1 s/\(^FROM\)/FROM --platform=\$$\{BUILDPLATFORM\}/; t' -e ' 1,// s//FROM --platform=\$$\{BUILDPLATFORM\}/' Dockerfile > Dockerfile.cross
- $(CONTAINER_TOOL) buildx create --name project-v3-builder
$(CONTAINER_TOOL) buildx use project-v3-builder
- $(CONTAINER_TOOL) buildx build --push --platform=$(PLATFORMS) --tag ${IMG} -f Dockerfile.cross ../
- $(CONTAINER_TOOL) buildx build --push --platform=$(PLATFORMS) --tag ${IMG} -f Dockerfile ../
- $(CONTAINER_TOOL) buildx rm project-v3-builder
rm Dockerfile.cross
#rm Dockerfile.cross

.PHONY: build-installer
build-installer: manifests generate kustomize ## Generate a consolidated YAML with CRDs and deployment.
Expand Down Expand Up @@ -154,7 +154,7 @@ $(LOCALBIN):
## Tool Binaries
KUBECTL ?= kubectl
KUSTOMIZE ?= $(LOCALBIN)/kustomize-$(KUSTOMIZE_VERSION)
CONTROLLER_GEN ?= $(LOCALBIN)/controller-gen-$(CONTROLLER_TOOLS_VERSION)
CONTROLLER_GEN ?= $(LOCALBIN)/controller-gen-$(CONTROLLER_TOOLS_VERSION)
ENVTEST ?= $(LOCALBIN)/setup-envtest-$(ENVTEST_VERSION)
GOLANGCI_LINT = $(LOCALBIN)/golangci-lint-$(GOLANGCI_LINT_VERSION)

Expand Down
4 changes: 1 addition & 3 deletions operator/api/v1alpha1/elastiservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ type ElastiServiceSpec struct {
// Important: Run "make" to regenerate code after modifying this file
ScaleTargetRef ScaleTargetRef `json:"scaleTargetRef,omitempty"`
Service string `json:"service,omitempty"`
QTimout int32 `json:"queueTimeout,omitempty"`
IdlePeriod int32 `json:"idlePeriod,omitempty"`
MinTargetReplicas int32 `json:"minTargetReplicas,omitempty"`
MinTargetReplicas int32 `json:"minTargetReplicas,omitempty" default:"1"`
}

type ScaleTargetRef struct {
Expand Down
1 change: 0 additions & 1 deletion operator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func init() {
}

const (
// TODO: This can be configured to be sent via config or env
elastiServerPort = ":8013"
)

Expand Down
11 changes: 4 additions & 7 deletions operator/internal/controller/elastiservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,12 @@ func (r *ElastiServiceReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

// If the ElastiService is being deleted, we need to clean up the resources
if err := r.checkIfCRDIsDeleted(ctx, es, req); err != nil {
if isDeleted, err := r.checkIfCRDIsDeleted(ctx, es, req); err != nil {
r.Logger.Error("Failed to check if CRD is deleted", zap.String("es", req.String()), zap.Error(err))
return res, err
} else if isDeleted {
r.Logger.Info("CRD is deleted", zap.String("es", req.String()))
return res, nil
}

// We also check if the CRD has finalizer, and if not, we add the finalizer
Expand All @@ -87,12 +90,6 @@ func (r *ElastiServiceReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return res, err
}

// Check if Public Service is present, and has not changed from the values in CRDDirectory
if err := r.checkChangesInPublicService(ctx, es, req); err != nil {
r.Logger.Error("Failed to check changes in public service", zap.String("es", req.String()), zap.Error(err))
return res, err
}

// We add the CRD details to service directory, so when elasti server received a request,
// we can find the right resource to scale up
crdDirectory.CRDDirectory.AddCRD(es.Spec.Service, &crdDirectory.CRDDetails{
Expand Down
27 changes: 7 additions & 20 deletions operator/internal/controller/opsCRD.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,13 @@ func (r *ElastiServiceReconciler) checkAndAddCRDFinalizer(ctx context.Context, e
return nil
}

// finalizeCRD reset changes made for the CRD
func (r *ElastiServiceReconciler) finalizeCRD(ctx context.Context, es *v1alpha1.ElastiService, req ctrl.Request) error {
r.Logger.Info("ElastiService is being deleted", zap.String("name", es.Name), zap.Any("deletionTimestamp", es.ObjectMeta.DeletionTimestamp))
// Reset the informer start mutex, so if the ElastiService is recreated, we will need to reset the informer
r.resetMutexForInformer(r.getMutexKeyForTargetRef(req))
r.resetMutexForInformer(r.getMutexKeyForPublicSVC(req))
// Stop all active informers
// NOTE: If the informerManager is shared across multiple controllers, this will stop all informers
// In that case, we must call the
// Stop all active informers related to this CRD
go r.Informer.StopForCRD(req.Name)
// Remove CRD details from service directory
crdDirectory.CRDDirectory.RemoveCRD(es.Spec.Service)
Expand Down Expand Up @@ -134,25 +133,12 @@ func (r *ElastiServiceReconciler) checkChangesInScaleTargetRef(ctx context.Conte
}

// checkChangesInPublicService checks if the Public Service has changed, and makes sure it's not null

func (r *ElastiServiceReconciler) checkChangesInPublicService(ctx context.Context, es *v1alpha1.ElastiService, req ctrl.Request) error {
if es.Spec.Service == "" {
r.Logger.Error("Public Service is not present", zap.String("es", req.String()))
return k8sHelper.ErrNoPublicServiceFound
}

// crd, found := crdDirectory.CRDDirectory.GetCRD(es.Spec.Service)
// if found {
// if crd.Spec.Service != es.Spec.Service {
// r.Logger.Info("Public Service has changed", zap.String("es", req.String()))
// r.Logger.Debug("Stopping informer for public service", zap.String("public service", crd.Spec.Service))
// key := r.Informer.GetKey(req.Namespace, req.Name, crd.Spec.Service, values.KindService)
// r.Informer.StopInformer(key)
// r.Logger.Debug("Resetting mutex for public service", zap.String("public service", crd.Spec.Service))
// r.resetMutexForInformer(r.getMutexKeyForPublicSVC(req))
// }
// }

r.getMutexForInformerStart(r.getMutexKeyForPublicSVC(req)).Do(func() {
r.Informer.Add(&informer.RequestWatch{
Req: req,
Expand All @@ -170,20 +156,21 @@ func (r *ElastiServiceReconciler) checkChangesInPublicService(ctx context.Contex
return nil
}

func (r *ElastiServiceReconciler) checkIfCRDIsDeleted(ctx context.Context, es *v1alpha1.ElastiService, req ctrl.Request) error {
func (r *ElastiServiceReconciler) checkIfCRDIsDeleted(ctx context.Context, es *v1alpha1.ElastiService, req ctrl.Request) (bool, error) {
// If the ElastiService is being deleted, we need to clean up the resources
if !es.ObjectMeta.DeletionTimestamp.IsZero() {
if controllerutil.ContainsFinalizer(es, v1alpha1.ElastiServiceFinalizer) {
// If CRD contains finalizer, we call the finaizer function and remove the finalizer post that
if err := r.finalizeCRD(ctx, es, req); err != nil {
r.Logger.Error("Failed to enable serve mode", zap.String("es", req.String()), zap.Error(err))
return err
return true, err
}
controllerutil.RemoveFinalizer(es, v1alpha1.ElastiServiceFinalizer)
if err := r.Update(ctx, es); err != nil {
return err
return true, err
}
}
return true, nil
}
return nil
return false, nil
}
6 changes: 3 additions & 3 deletions operator/internal/controller/opsEndpointslices.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (r *ElastiServiceReconciler) createOrUpdateEndpointsliceToResolver(ctx cont
return err
}

// TODO: Suggestion is to give it a random name in end, to avoid any conflicts, which is rare, but possible.
// In case of random name, we need to store the name in CRD.
// NOTE: Suggestion is to give it a random name in end, to avoid any conflicts, which is rare, but possible.
// In case of random name, we need to store the name in CRD. Right now, we provide a deterministic hashed name.
newEndpointsliceToResolverName := utils.GetEndpointSliceToResolverName(service.Name)
EndpointsliceNamespacedName := types.NamespacedName{
Name: newEndpointsliceToResolverName,
Expand Down Expand Up @@ -116,7 +116,7 @@ func (r *ElastiServiceReconciler) createOrUpdateEndpointsliceToResolver(ctx cont
}
r.Logger.Info("EndpointSlice updated successfully", zap.String("endpointslice", EndpointsliceNamespacedName.String()))
} else {
// TODO: Make sure the private service is owned by the ElastiService
// TODOS: Make sure the private service is owned by the ElastiService
if err := r.Create(ctx, newEndpointSlice); err != nil {
r.Logger.Error("failed to create sliceToResolver", zap.String("endpointslice", EndpointsliceNamespacedName.String()), zap.Error(err))
return err
Expand Down
7 changes: 7 additions & 0 deletions operator/internal/controller/opsModes.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ func (r *ElastiServiceReconciler) enableProxyMode(ctx context.Context, req ctrl.
if err != nil {
return err
}

// Check if Public Service is present, and has not changed from the values in CRDDirectory
if err := r.checkChangesInPublicService(ctx, es, req); err != nil {
r.Logger.Error("Failed to check changes in public service", zap.String("es", req.String()), zap.Error(err))
return err
}

if err = r.createOrUpdateEndpointsliceToResolver(ctx, targetSVC); err != nil {
return err
}
Expand Down
49 changes: 31 additions & 18 deletions operator/internal/controller/opsServices.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,41 @@ func (r *ElastiServiceReconciler) checkAndCreatePrivateService(ctx context.Conte
return PVTName, nil
}

func (r *ElastiServiceReconciler) handlePublicServiceChanges(ctx context.Context, obj interface{}, serviceName, namespace string) {
publicService := &v1.Service{}
err := k8sHelper.UnstructuredToResource(obj, publicService)
// handlePublicServiceChanges handles the changes in the public service, and sync those changes in the private service
func (r *ElastiServiceReconciler) handlePublicServiceChanges(ctx context.Context, obj interface{}, serviceName, _ string) {
publicSVC := &v1.Service{}
err := k8sHelper.UnstructuredToResource(obj, publicSVC)
if err != nil {
r.Logger.Error("Failed to convert unstructured to service", zap.Error(err))
return
}

if publicService.Name == serviceName {
targetNamespacedName := types.NamespacedName{
Name: serviceName,
Namespace: namespace,
}
targetSVC := &v1.Service{}
if err := r.Get(ctx, targetNamespacedName, targetSVC); err != nil {
r.Logger.Error("Failed to get service to update endpointslice", zap.String("service", targetNamespacedName.String()), zap.Error(err))
return
}
if err := r.createOrUpdateEndpointsliceToResolver(ctx, targetSVC); err != nil {
r.Logger.Error("Failed to create or update endpointslice to resolver", zap.String("service", targetNamespacedName.String()), zap.Error(err))
return
}
// Check if the service is same as mentioned in CRD
if publicSVC.Name != serviceName {
r.Logger.Debug("Public service is not same as mentioned in CRD. Informer misconfigured.", zap.String("service", publicSVC.Name))
return
}
r.Logger.Info("Public service changed", zap.String("service", publicService.Name))
// Get Private Service
PVTName := utils.GetPrivateSerivceName(publicSVC.Name)
privateServiceNamespacedName := types.NamespacedName{Name: PVTName, Namespace: publicSVC.Namespace}
privateSVC := &v1.Service{}
if err := r.Get(ctx, privateServiceNamespacedName, privateSVC); err != nil && !errors.IsNotFound(err) {
r.Logger.Error("Failed to get private service", zap.Error(err))
return
} else if errors.IsNotFound(err) {
r.Logger.Error("Private service not found", zap.String("private-service", privateServiceNamespacedName.String()))
return
}

// Sync the changes in private service
privateSVC.Spec.Ports = publicSVC.Spec.Ports
privateSVC.Spec.Selector = publicSVC.Spec.Selector

// Update the private service
if err := r.Update(ctx, privateSVC); err != nil {
r.Logger.Error("Failed to update private service", zap.String("private-service", privateServiceNamespacedName.String()), zap.Error(err))
return
}

r.Logger.Info("Public service changed. Private service synced.", zap.String("service", publicSVC.Name))
}
27 changes: 18 additions & 9 deletions operator/internal/elastiServer/elastiServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package elastiServer
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"sync"

"k8s.io/client-go/rest"

Expand All @@ -24,8 +25,9 @@ type (
// It is used by components about certain events, like when resolver receive the request
// for a service, that service is scaled up if it's at 0 replicas
Server struct {
logger *zap.Logger
k8sHelper *k8sHelper.Ops
logger *zap.Logger
k8sHelper *k8sHelper.Ops
scaleLocks sync.Map
}
)

Expand Down Expand Up @@ -93,22 +95,29 @@ func (s *Server) resolverReqHandler(w http.ResponseWriter, req *http.Request) {
}
err = s.scaleTargetForService(ctx, body.Svc, body.Namespace)
if err != nil {
s.logger.Error("Failed to compare and scale deployment", zap.Error(err))
s.logger.Error("Failed to compare and scale target", zap.Error(err))
return
}
s.logger.Info("-- Received fulfilled from Resolver", zap.Any("body", body))
}

func (s *Server) scaleTargetForService(_ context.Context, serviceName, namespace string) error {
scaleMutex := s.getMutexForServiceScale(serviceName)
scaleMutex.Lock()
defer s.logger.Debug("Scale target lock released")
defer scaleMutex.Unlock()
s.logger.Debug("Scale target lock taken")
crd, found := crdDirectory.CRDDirectory.GetCRD(serviceName)
if !found {
s.logger.Error("Failed to get CRD details from directory")
return errors.New("failed to get CRD details from directory")
return fmt.Errorf("scaleTargetForService - error: failed to get CRD details from directory, serviceName: %s", serviceName)
}
if err := s.k8sHelper.ScaleTargetWhenAtZero(namespace, crd.Spec.ScaleTargetRef.Name, crd.Spec.ScaleTargetRef.Kind, crd.Spec.MinTargetReplicas); err != nil {
s.logger.Error("Failed to scale TargetRef", zap.Any("TargetRef", crd.Spec.ScaleTargetRef), zap.Error(err))
return err
return fmt.Errorf("scaleTargetForService - error: %w, targetRefKind: %s, targetRefName: %s", err, crd.Spec.ScaleTargetRef.Kind, crd.Spec.ScaleTargetRef.Name)
}
s.logger.Info("TargetRef is scaled up", zap.Any("TargetRef", crd.Spec.ScaleTargetRef))
return nil
}

func (s *Server) getMutexForServiceScale(serviceName string) *sync.Mutex {
l, _ := s.scaleLocks.LoadOrStore(serviceName, &sync.Mutex{})
return l.(*sync.Mutex)
}
4 changes: 2 additions & 2 deletions operator/internal/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,17 +248,17 @@ func (m *Manager) enableInformer(req *RequestWatch) {
// CRDname.resourcerName.Namespace
func (m *Manager) getKeyFromRequestWatch(req *RequestWatch) string {
return fmt.Sprintf("%s/%s/%s/%s",
strings.ToLower(req.ResourceNamespace), // Namespace
strings.ToLower(req.Req.Name), // CRD Name
strings.ToLower(req.ResourceNamespace), // Namespace
strings.ToLower(req.GroupVersionResource.Resource), // Resource Type
strings.ToLower(req.ResourceName)) // Resource Name
}

// GetKey is to get the key for the informer map using namespace and resource name
func (m *Manager) GetKey(namespace, crdName, resource, resourceName string) string {
return fmt.Sprintf("%s/%s/%s/%s",
strings.ToLower(namespace), // Namespace
strings.ToLower(crdName), // CRD Name
strings.ToLower(namespace), // Namespace
strings.ToLower(resource), // Resource Type
strings.ToLower(resourceName)) // Resource Name
}
Loading

0 comments on commit b16970f

Please sign in to comment.