Skip to content

Commit

Permalink
refactor: persist collectors memory settings in collectorsgroup CRD (#…
Browse files Browse the repository at this point in the history
…1824)

This PR makes it so:
- calculated memory settings for collectors are calculated and written
in collectors group CRD by the scheduler.
- The values can be easily seen where the blong logically, in each
collectors group.
- after the refactor, auto scaler no longer depends or watches the
odigos config resource in case it changes.
- any changes to the memory settings are updated in the collector group
CRD which autoscaler already watch and reconcile.

It aim to simplify the reconciliation processes in autoscaler, and make
it depend on one less resource. any config computations are offloaded to
scheduler and should trigger autoscaler less.


This PR is pure refactor and does not change any existing behavior.
follow up PRs will introduce changes, additions, and applying memory
settings also in node collector.
  • Loading branch information
blumamir authored Nov 22, 2024
1 parent 1e6a095 commit d6df850
Show file tree
Hide file tree
Showing 16 changed files with 242 additions and 97 deletions.
43 changes: 43 additions & 0 deletions api/config/crd/bases/odigos.io_collectorsgroups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,56 @@ spec:
This can be used to resolve conflicting ports when a collector is using the host network.
format: int32
type: integer
memorySettings:
description: |-
Memory settings for the collectors group.
these settings are used to protect the collectors instances from:
- running out of memory and being killed by the k8s OOM killer
- consuming all available memory on the node which can lead to node instability
- pushing back pressure to the instrumented applications
properties:
gomemlimitMiB:
description: |-
the GOMEMLIMIT environment variable value for the collector pod.
this is when go runtime will start garbage collection.
it is recommended to be set to 80% of the hard limit of the memory limiter.
type: integer
memoryLimiterLimitMiB:
description: |-
this parameter sets the "limit_mib" parameter in the memory limiter configuration for the collector.
it is the hard limit after which a force garbage collection will be performed.
this value will end up comparing against the go runtime reported heap Alloc value.
According to the memory limiter docs:
> Note that typically the total memory usage of process will be about 50MiB higher than this value
a test from nov 2024 showed that fresh odigos collector with no traffic takes 38MiB,
thus the 50MiB is a good value to start with.
type: integer
memoryLimiterSpikeLimitMiB:
description: |-
this parameter sets the "spike_limit_mib" parameter in the memory limiter configuration for the collector memory limiter.
note that this is not the processor soft limit itself, but the diff in Mib between the hard limit and the soft limit.
according to the memory limiter docs, it is recommended to set this to 20% of the hard limit.
changing this value allows trade-offs between memory usage and resiliency to bursts.
type: integer
memoryRequestMiB:
description: |-
MemoryRequestMiB is the memory resource request to be used on the pod template.
it will be embedded in the as a resource request of the form "memory: <value>Mi"
type: integer
required:
- gomemlimitMiB
- memoryLimiterLimitMiB
- memoryLimiterSpikeLimitMiB
- memoryRequestMiB
type: object
role:
enum:
- CLUSTER_GATEWAY
- NODE_COLLECTOR
type: string
required:
- collectorOwnMetricsPort
- memorySettings
- role
type: object
status:
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions api/generated/odigos/applyconfiguration/utils.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions api/odigos/v1alpha1/collectorsgroup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,51 @@ const (
CollectorsGroupRoleNodeCollector CollectorsGroupRole = CollectorsGroupRole(k8sconsts.CollectorsRoleNodeCollector)
)

// The raw values of the memory settings for the collectors group.
// any defaulting, validations and calculations should be done in the controllers
// that create this CR.
// Values will be used as is without any further processing.
type CollectorsGroupMemorySettings struct {

// MemoryRequestMiB is the memory resource request to be used on the pod template.
// it will be embedded in the as a resource request of the form "memory: <value>Mi"
MemoryRequestMiB int `json:"memoryRequestMiB"`

// this parameter sets the "limit_mib" parameter in the memory limiter configuration for the collector.
// it is the hard limit after which a force garbage collection will be performed.
// this value will end up comparing against the go runtime reported heap Alloc value.
// According to the memory limiter docs:
// > Note that typically the total memory usage of process will be about 50MiB higher than this value
// a test from nov 2024 showed that fresh odigos collector with no traffic takes 38MiB,
// thus the 50MiB is a good value to start with.
MemoryLimiterLimitMiB int `json:"memoryLimiterLimitMiB"`

// this parameter sets the "spike_limit_mib" parameter in the memory limiter configuration for the collector memory limiter.
// note that this is not the processor soft limit itself, but the diff in Mib between the hard limit and the soft limit.
// according to the memory limiter docs, it is recommended to set this to 20% of the hard limit.
// changing this value allows trade-offs between memory usage and resiliency to bursts.
MemoryLimiterSpikeLimitMiB int `json:"memoryLimiterSpikeLimitMiB"`

// the GOMEMLIMIT environment variable value for the collector pod.
// this is when go runtime will start garbage collection.
// it is recommended to be set to 80% of the hard limit of the memory limiter.
GomemlimitMiB int `json:"gomemlimitMiB"`
}

// CollectorsGroupSpec defines the desired state of Collector
type CollectorsGroupSpec struct {
Role CollectorsGroupRole `json:"role"`

// The port to use for exposing the collector's own metrics as a prometheus endpoint.
// This can be used to resolve conflicting ports when a collector is using the host network.
CollectorOwnMetricsPort int32 `json:"collectorOwnMetricsPort"`

// Memory settings for the collectors group.
// these settings are used to protect the collectors instances from:
// - running out of memory and being killed by the k8s OOM killer
// - consuming all available memory on the node which can lead to node instability
// - pushing back pressure to the instrumented applications
MemorySettings CollectorsGroupMemorySettings `json:"memorySettings"`
}

// CollectorsGroupStatus defines the observed state of Collector
Expand Down
16 changes: 16 additions & 0 deletions api/odigos/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions autoscaler/controllers/gateway/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,13 @@ func addSelfTelemetryPipeline(c *config.Config, ownTelemetryPort int32) error {
return nil
}

func syncConfigMap(dests *odigosv1.DestinationList, allProcessors *odigosv1.ProcessorList, gateway *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme, memConfig *memoryConfigurations) (string, []odigoscommon.ObservabilitySignal, error) {
func syncConfigMap(dests *odigosv1.DestinationList, allProcessors *odigosv1.ProcessorList, gateway *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme) (string, []odigoscommon.ObservabilitySignal, error) {
logger := log.FromContext(ctx)

memoryLimiterConfiguration := config.GenericMap{
"check_interval": "1s",
"limit_mib": memConfig.memoryLimiterLimitMiB,
"spike_limit_mib": memConfig.memoryLimiterSpikeLimitMiB,
"limit_mib": gateway.Spec.MemorySettings.MemoryLimiterLimitMiB,
"spike_limit_mib": gateway.Spec.MemorySettings.MemoryLimiterSpikeLimitMiB,
}

processors := common.FilterAndSortProcessorsByOrderHint(allProcessors, odigosv1.CollectorsGroupRoleClusterGateway)
Expand Down
10 changes: 5 additions & 5 deletions autoscaler/controllers/gateway/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
)

func syncDeployment(dests *odigosv1.DestinationList, gateway *odigosv1.CollectorsGroup, configData string,
ctx context.Context, c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string, memConfig *memoryConfigurations) (*appsv1.Deployment, error) {
ctx context.Context, c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string) (*appsv1.Deployment, error) {
logger := log.FromContext(ctx)

secretsVersionHash, err := destinationsSecretsVersionsHash(ctx, c, dests)
Expand All @@ -44,7 +44,7 @@ func syncDeployment(dests *odigosv1.DestinationList, gateway *odigosv1.Collector

// Calculate the hash of the config data and the secrets version hash, this is used to make sure the gateway will restart when the config changes
configDataHash := common.Sha256Hash(fmt.Sprintf("%s-%s", configData, secretsVersionHash))
desiredDeployment, err := getDesiredDeployment(dests, configDataHash, gateway, scheme, imagePullSecrets, odigosVersion, memConfig)
desiredDeployment, err := getDesiredDeployment(dests, configDataHash, gateway, scheme, imagePullSecrets, odigosVersion)
if err != nil {
return nil, errors.Join(err, errors.New("failed to get desired deployment"))
}
Expand Down Expand Up @@ -88,9 +88,9 @@ func patchDeployment(existing *appsv1.Deployment, desired *appsv1.Deployment, ct
}

func getDesiredDeployment(dests *odigosv1.DestinationList, configDataHash string,
gateway *odigosv1.CollectorsGroup, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string, memConfig *memoryConfigurations) (*appsv1.Deployment, error) {
gateway *odigosv1.CollectorsGroup, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string) (*appsv1.Deployment, error) {

requestMemoryQuantity := resource.MustParse(fmt.Sprintf("%dMi", memConfig.memoryRequestMiB))
requestMemoryQuantity := resource.MustParse(fmt.Sprintf("%dMi", gateway.Spec.MemorySettings.MemoryRequestMiB))

desiredDeployment := &appsv1.Deployment{
ObjectMeta: v1.ObjectMeta{
Expand Down Expand Up @@ -158,7 +158,7 @@ func getDesiredDeployment(dests *odigosv1.DestinationList, configDataHash string
},
{
Name: "GOMEMLIMIT",
Value: fmt.Sprintf("%dMiB", memConfig.gomemlimitMiB),
Value: fmt.Sprintf("%dMiB", gateway.Spec.MemorySettings.GomemlimitMiB),
},
},
SecurityContext: &corev1.SecurityContext{
Expand Down
4 changes: 2 additions & 2 deletions autoscaler/controllers/gateway/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ var (
stabilizationWindowSeconds = intPtr(300) // cooldown period for scaling down
)

func syncHPA(gateway *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme, memConfig *memoryConfigurations, kubeVersion *version.Version) error {
func syncHPA(gateway *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme, kubeVersion *version.Version) error {
logger := log.FromContext(ctx)

var hpa client.Object

memLimit := memConfig.gomemlimitMiB * memoryLimitPercentageForHPA / 100.0
memLimit := gateway.Spec.MemorySettings.GomemlimitMiB * memoryLimitPercentageForHPA / 100.0
metricQuantity := resource.MustParse(fmt.Sprintf("%dMi", memLimit))

switch {
Expand Down
21 changes: 5 additions & 16 deletions autoscaler/controllers/gateway/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ import (
odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
commonconf "github.com/odigos-io/odigos/autoscaler/controllers/common"
controllerconfig "github.com/odigos-io/odigos/autoscaler/controllers/controller_config"
odigoscommon "github.com/odigos-io/odigos/common"
k8sconsts "github.com/odigos-io/odigos/k8sutils/pkg/consts"
"github.com/odigos-io/odigos/k8sutils/pkg/env"
"github.com/odigos-io/odigos/k8sutils/pkg/utils"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -49,14 +47,7 @@ func Sync(ctx context.Context, k8sClient client.Client, scheme *runtime.Scheme,
// Add the generic batch processor to the list of processors
processors.Items = append(processors.Items, commonconf.GetGenericBatchProcessor())

odigosConfig, err := utils.GetCurrentOdigosConfig(ctx, k8sClient)
if err != nil {
logger.Error(err, "failed to get odigos config")
return err
}

err = syncGateway(&dests, &processors, &gatewayCollectorGroup, ctx, k8sClient, scheme, imagePullSecrets, odigosVersion, &odigosConfig,
config)
err = syncGateway(&dests, &processors, &gatewayCollectorGroup, ctx, k8sClient, scheme, imagePullSecrets, odigosVersion, config)
statusPatchString := commonconf.GetCollectorsGroupDeployedConditionsPatch(err)
statusErr := k8sClient.Status().Patch(ctx, &gatewayCollectorGroup, client.RawPatch(types.MergePatchType, []byte(statusPatchString)))
if statusErr != nil {
Expand All @@ -68,14 +59,12 @@ func Sync(ctx context.Context, k8sClient client.Client, scheme *runtime.Scheme,

func syncGateway(dests *odigosv1.DestinationList, processors *odigosv1.ProcessorList,
gateway *odigosv1.CollectorsGroup, ctx context.Context,
c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string, odigosConfig *odigoscommon.OdigosConfiguration,
c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string,
config *controllerconfig.ControllerConfig) error {
logger := log.FromContext(ctx)
logger.V(0).Info("Syncing gateway")

memConfig := getMemoryConfigurations(odigosConfig)

configData, signals, err := syncConfigMap(dests, processors, gateway, ctx, c, scheme, memConfig)
configData, signals, err := syncConfigMap(dests, processors, gateway, ctx, c, scheme)
if err != nil {
logger.Error(err, "Failed to sync config map")
return err
Expand All @@ -93,7 +82,7 @@ func syncGateway(dests *odigosv1.DestinationList, processors *odigosv1.Processor
return err
}

_, err = syncDeployment(dests, gateway, configData, ctx, c, scheme, imagePullSecrets, odigosVersion, memConfig)
_, err = syncDeployment(dests, gateway, configData, ctx, c, scheme, imagePullSecrets, odigosVersion)
if err != nil {
logger.Error(err, "Failed to sync deployment")
return err
Expand All @@ -105,7 +94,7 @@ func syncGateway(dests *odigosv1.DestinationList, processors *odigosv1.Processor
return err
}

err = syncHPA(gateway, ctx, c, scheme, memConfig, config.K8sVersion)
err = syncHPA(gateway, ctx, c, scheme, config.K8sVersion)
if err != nil {
logger.Error(err, "Failed to sync HPA")
}
Expand Down
Loading

0 comments on commit d6df850

Please sign in to comment.