Skip to content

Commit

Permalink
[release-v1.15] Upgrade Eventing libs to latest 1.15 (#1318)
Browse files Browse the repository at this point in the history
* Upgrade Eventing libs to latest 1.15

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Fix consumer group build error and remove unused configs

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Regenerate release

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi authored Oct 31, 2024
1 parent d6ff73e commit df95c16
Show file tree
Hide file tree
Showing 30 changed files with 349 additions and 2,748 deletions.

This file was deleted.

This file was deleted.

86 changes: 8 additions & 78 deletions control-plane/pkg/reconciler/consumergroup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ package consumergroup

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/kelseyhightower/envconfig"
"go.uber.org/multierr"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -44,7 +42,6 @@ import (
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
nodeinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/node"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod/filtered"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
"knative.dev/pkg/configmap"
Expand Down Expand Up @@ -92,11 +89,9 @@ type envConfig struct {
}

type SchedulerConfig struct {
StatefulSetName string
RefreshPeriod time.Duration
Capacity int32
SchedulerPolicy *scheduler.SchedulerPolicy
DeSchedulerPolicy *scheduler.SchedulerPolicy
StatefulSetName string
RefreshPeriod time.Duration
Capacity int32
}

func NewController(ctx context.Context, watcher configmap.Watcher) *controller.Impl {
Expand All @@ -111,10 +106,8 @@ func NewController(ctx context.Context, watcher configmap.Watcher) *controller.I
dispatcherPodLister := dispatcherPodInformer.Lister()

c := SchedulerConfig{
RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second,
Capacity: env.PodCapacity,
SchedulerPolicy: schedulerPolicyFromConfigMapOrFail(ctx, env.SchedulerPolicyConfigMap),
DeSchedulerPolicy: schedulerPolicyFromConfigMapOrFail(ctx, env.DeSchedulerPolicyConfigMap),
RefreshPeriod: time.Duration(env.SchedulerRefreshPeriod) * time.Second,
Capacity: env.PodCapacity,
}

schedulers := map[string]Scheduler{
Expand Down Expand Up @@ -333,11 +326,9 @@ func createKafkaScheduler(ctx context.Context, c SchedulerConfig, podLister core
ctx,
podLister,
SchedulerConfig{
StatefulSetName: ssName,
RefreshPeriod: c.RefreshPeriod,
Capacity: c.Capacity,
SchedulerPolicy: c.SchedulerPolicy,
DeSchedulerPolicy: c.DeSchedulerPolicy,
StatefulSetName: ssName,
RefreshPeriod: c.RefreshPeriod,
Capacity: c.Capacity,
},
func() ([]scheduler.VPod, error) {
consumerGroups, err := lister.List(labels.SelectorFromSet(getSelectorLabel(ssName)))
Expand Down Expand Up @@ -380,12 +371,8 @@ func createStatefulSetScheduler(ctx context.Context, podLister corelisters.PodLi
ScaleCacheConfig: scheduler.ScaleCacheConfig{RefreshPeriod: statefulSetScaleCacheRefreshPeriod},
PodCapacity: c.Capacity,
RefreshPeriod: c.RefreshPeriod,
SchedulerPolicy: scheduler.MAXFILLUP,
SchedPolicy: c.SchedulerPolicy,
DeschedPolicy: c.DeSchedulerPolicy,
Evictor: newEvictor(ctx, zap.String("kafka.eventing.knative.dev/component", "evictor")).evict,
VPodLister: lister,
NodeLister: nodeinformer.Get(ctx).Lister(),
PodLister: podLister.Pods(system.Namespace()),
})

Expand All @@ -394,60 +381,3 @@ func createStatefulSetScheduler(ctx context.Context, podLister corelisters.PodLi
SchedulerConfig: c,
}
}

// schedulerPolicyFromConfigMapOrFail reads predicates and priorities data from configMap
func schedulerPolicyFromConfigMapOrFail(ctx context.Context, configMapName string) *scheduler.SchedulerPolicy {
p, err := schedulerPolicyFromConfigMap(ctx, configMapName)
if err != nil {
logging.FromContext(ctx).Fatal(zap.Error(err))
}
return p
}

// schedulerPolicyFromConfigMap reads predicates and priorities data from configMap
func schedulerPolicyFromConfigMap(ctx context.Context, configMapName string) (*scheduler.SchedulerPolicy, error) {
policyConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, configMapName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("couldn't get scheduler policy config map %s/%s: %v", system.Namespace(), configMapName, err)
}

logger := logging.FromContext(ctx).
Desugar().
With(zap.String("configmap", configMapName))
policy := &scheduler.SchedulerPolicy{}

preds, found := policyConfigMap.Data["predicates"]
if !found {
return nil, fmt.Errorf("missing policy config map %s/%s value at key predicates", system.Namespace(), configMapName)
}
if err := json.NewDecoder(strings.NewReader(preds)).Decode(&policy.Predicates); err != nil {
return nil, fmt.Errorf("invalid policy %v: %v", preds, err)
}

priors, found := policyConfigMap.Data["priorities"]
if !found {
return nil, fmt.Errorf("missing policy config map value at key priorities")
}
if err := json.NewDecoder(strings.NewReader(priors)).Decode(&policy.Priorities); err != nil {
return nil, fmt.Errorf("invalid policy %v: %v", preds, err)
}

if errs := validatePolicy(policy); errs != nil {
return nil, multierr.Combine(err)
}

logger.Info("Schedulers policy registration", zap.Any("policy", policy))

return policy, nil
}

func validatePolicy(policy *scheduler.SchedulerPolicy) []error {
var validationErrors []error

for _, priority := range policy.Priorities {
if priority.Weight < scheduler.MinWeight || priority.Weight > scheduler.MaxWeight {
validationErrors = append(validationErrors, fmt.Errorf("priority %s should have a positive weight applied to it or it has overflown", priority.Name))
}
}
return validationErrors
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
k8s.io/apiserver v0.29.2
k8s.io/client-go v0.29.2
k8s.io/utils v0.0.0-20240102154912-e7106e64919e
knative.dev/eventing v0.42.2-0.20240923151015-7fedbd08af70
knative.dev/eventing v0.42.4-0.20241030064549-19e4c4bddaca
knative.dev/hack v0.0.0-20240704013904-b9799599afcf
knative.dev/pkg v0.0.0-20240716082220-4355f0c73608
knative.dev/reconciler-test v0.0.0-20240716134925-00d94f40c470
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1213,8 +1213,8 @@ k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e h1:eQ/4ljkx21sObifjzXwlPKpdGLrCfRziVtos3ofG/sQ=
k8s.io/utils v0.0.0-20240102154912-e7106e64919e/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
knative.dev/eventing v0.42.2-0.20240923151015-7fedbd08af70 h1:Cf6YhPrDySVcyIqHcvBonCQpyt0hlEYJuIF9pF5zIVo=
knative.dev/eventing v0.42.2-0.20240923151015-7fedbd08af70/go.mod h1:hW5BMYcihtCelT9pqaMtK8gmNOo1ybxcigjBY+/fU+k=
knative.dev/eventing v0.42.4-0.20241030064549-19e4c4bddaca h1:i5PgRTbxNX1u7CnrWnQFFV0eGky09pU14NHzeacWwPY=
knative.dev/eventing v0.42.4-0.20241030064549-19e4c4bddaca/go.mod h1:hW5BMYcihtCelT9pqaMtK8gmNOo1ybxcigjBY+/fU+k=
knative.dev/hack v0.0.0-20240704013904-b9799599afcf h1:n92FmZRywgtHso7pFAku7CW0qvRAs1hXtMQqO0R6eiE=
knative.dev/hack v0.0.0-20240704013904-b9799599afcf/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q=
knative.dev/pkg v0.0.0-20240716082220-4355f0c73608 h1:BOiRzcnRS9Z5ruxlCiS/K1/Hb5bUN0X4W3xCegdcYQE=
Expand Down

This file was deleted.

Loading

0 comments on commit df95c16

Please sign in to comment.