diff --git a/broker/README.md b/broker/README.md index 4f6dfa4ad1..0c9c164957 100644 --- a/broker/README.md +++ b/broker/README.md @@ -39,9 +39,9 @@ install [KEDA based autoscaler](https://github.com/knative-sandbox/eventing-autoscaler-keda). ## Trigger Pre-Fetch Count -Trigger has a configurable annotation `rabbitmq.eventing.knative.dev/prefetchCount`. The following are effects of setting this parameter to `n`: +Trigger has a configurable annotation `rabbitmq.eventing.knative.dev/parallelism`. The following are effects of setting this parameter to `n`: -- Prefetch count is set on the RabbitMQ channel and queue created for this trigger. The channel will receive a maximum of `n` number of messages at once. +- Prefetch count is set to this value on the RabbitMQ channel and queue created for this trigger. The channel will receive a maximum of `n` number of messages at once. - The trigger will create `n` workers to consume messages off the queue and dispatch to the sink. If this value is unset, it will default to `1`. This means the trigger will only handle one event at a time. This will preserve the order of the messages in a queue but diff --git a/cmd/dispatcher/main.go b/cmd/dispatcher/main.go index 1ddeb4a084..2c1282b014 100644 --- a/cmd/dispatcher/main.go +++ b/cmd/dispatcher/main.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap" + "github.com/NeowayLabs/wabbit" "github.com/NeowayLabs/wabbit/amqp" "github.com/kelseyhightower/envconfig" amqperr "github.com/rabbitmq/amqp091-go" @@ -43,10 +44,13 @@ type envConfig struct { SubscriberURL string `envconfig:"SUBSCRIBER" required:"true"` // Number of concurrent messages in flight - PrefetchCount int `envconfig:"PREFETCH_COUNT" default:"1" required:"false"` + Parallelism int `envconfig:"PARALLELISM" default:"1" required:"false"` Retry int `envconfig:"RETRY" required:"false"` BackoffPolicy string `envconfig:"BACKOFF_POLICY" required:"false"` BackoffDelay time.Duration `envconfig:"BACKOFF_DELAY" default:"50ms" required:"false"` + + connection *amqp.Conn + channel wabbit.Channel } func main() { @@ -79,50 +83,68 @@ func main() { backoffDelay := env.BackoffDelay logging.FromContext(ctx).Infow("Setting BackoffDelay", zap.Any("backoffDelay", backoffDelay)) - conn, err := amqp.Dial(env.RabbitURL) - if err != nil { - logging.FromContext(ctx).Fatal("Failed to connect to RabbitMQ: ", err) - } + env.setupRabbitMQ(ctx) defer func() { - err = conn.Close() + err := env.connection.Close() if err != nil && !errors.Is(err, amqperr.ErrClosed) { logging.FromContext(ctx).Warn("Failed to close connection: ", err) } }() - - channel, err := conn.Channel() - if err != nil { - logging.FromContext(ctx).Fatal("Failed to open a channel: ", err) - } defer func() { - err = channel.Close() + err := env.channel.Close() if err != nil && !errors.Is(err, amqperr.ErrClosed) { logging.FromContext(ctx).Warn("Failed to close channel: ", err) } }() - err = channel.Qos( - env.PrefetchCount, // prefetch count - 0, // prefetch size - false, // global - ) - if err != nil { - logging.FromContext(ctx).Fatal("Failed to create QoS: ", err) - } - d := &dispatcher.Dispatcher{ BrokerIngressURL: env.BrokerIngressURL, SubscriberURL: env.SubscriberURL, MaxRetries: env.Retry, BackoffDelay: backoffDelay, BackoffPolicy: backoffPolicy, - WorkerCount: env.PrefetchCount, + WorkerCount: env.Parallelism, } - if err := d.ConsumeFromQueue(ctx, channel, env.QueueName); err != nil { - // ignore ctx cancelled and channel closed errors - if errors.Is(err, context.Canceled) || errors.Is(err, amqperr.ErrClosed) { - return + + for { + if err := d.ConsumeFromQueue(ctx, env.channel, env.QueueName); err != nil { + // ignore ctx cancelled and channel closed errors + if errors.Is(err, amqperr.ErrClosed) { + env.setupRabbitMQ(ctx) + continue + } + + if errors.Is(err, context.Canceled) { + return + } + + logging.FromContext(ctx).Fatal("Failed to consume from queue: ", err) + break } - logging.FromContext(ctx).Fatal("Failed to consume from queue: ", err) + } +} + +func (env *envConfig) setupRabbitMQ(ctx context.Context) { + var err error + + if env.connection == nil || env.connection.IsClosed() { + env.connection, err = amqp.Dial(env.RabbitURL) + if err != nil { + logging.FromContext(ctx).Fatal("Failed to connect to RabbitMQ: ", err) + } + } + + env.channel, err = env.connection.Channel() + if err != nil { + logging.FromContext(ctx).Fatal("Failed to open a channel: ", err) + } + + err = env.channel.Qos( + env.Parallelism, // prefetch count + 0, // prefetch size + false, // global + ) + if err != nil { + logging.FromContext(ctx).Fatal("Failed to create QoS: ", err) } } diff --git a/config/source/300-rabbitmqsource.yaml b/config/source/300-rabbitmqsource.yaml index 9181050b91..790cdedfec 100644 --- a/config/source/300-rabbitmqsource.yaml +++ b/config/source/300-rabbitmqsource.yaml @@ -60,8 +60,8 @@ spec: globalQos: description: Channel Qos global property type: boolean - prefetchCount: - description: Channel Prefetch count + parallelism: + description: Sets the Channel's Prefetch count and number of Workers to consume simultaneously from it maximum: 1000 minimum: 1 type: integer diff --git a/pkg/adapter/adapter.go b/pkg/adapter/adapter.go index cd87ea699a..397e362109 100644 --- a/pkg/adapter/adapter.go +++ b/pkg/adapter/adapter.go @@ -53,8 +53,8 @@ type ExchangeConfig struct { } type ChannelConfig struct { - PrefetchCount int `envconfig:"RABBITMQ_CHANNEL_CONFIG_PREFETCH_COUNT" default:"1" required:"false"` - GlobalQos bool `envconfig:"RABBITMQ_CHANNEL_CONFIG_QOS_GLOBAL" required:"false"` + Parallelism int `envconfig:"RABBITMQ_CHANNEL_CONFIG_PARALLELISM" default:"1" required:"false"` + GlobalQos bool `envconfig:"RABBITMQ_CHANNEL_CONFIG_QOS_GLOBAL" required:"false"` } type QueueConfig struct { @@ -158,12 +158,12 @@ func (a *Adapter) CreateChannel(conn *amqp.Conn, connTest *amqptest.Conn, } logger.Info("Initializing Channel with Config: ", - zap.Int("PrefetchCount", a.config.ChannelConfig.PrefetchCount), + zap.Int("Parallelism", a.config.ChannelConfig.Parallelism), zap.Bool("GlobalQoS", a.config.ChannelConfig.GlobalQos), ) err = ch.Qos( - a.config.ChannelConfig.PrefetchCount, + a.config.ChannelConfig.Parallelism, 0, a.config.ChannelConfig.GlobalQos, ) @@ -232,7 +232,7 @@ func (a *Adapter) PollForMessages(channel *wabbit.Channel, msgs, _ := a.ConsumeMessages(channel, queue, logger) wg := &sync.WaitGroup{} - workerCount := a.config.ChannelConfig.PrefetchCount + workerCount := a.config.ChannelConfig.Parallelism wg.Add(workerCount) workerQueue := make(chan wabbit.Delivery, workerCount) logger.Info("Starting GoRoutines Workers: ", zap.Int("WorkerCount", workerCount)) diff --git a/pkg/apis/eventing/v1/trigger_validation.go b/pkg/apis/eventing/v1/trigger_validation.go index e4f59ce4a3..f13818aa1e 100644 --- a/pkg/apis/eventing/v1/trigger_validation.go +++ b/pkg/apis/eventing/v1/trigger_validation.go @@ -30,8 +30,8 @@ import ( ) const ( - BrokerClass = "RabbitMQBroker" - prefetchAnnotation = "rabbitmq.eventing.knative.dev/prefetchCount" + BrokerClass = "RabbitMQBroker" + parallelismAnnotation = "rabbitmq.eventing.knative.dev/parallelism" ) func ValidateTrigger(ctx context.Context) func(context.Context, *unstructured.Unstructured) error { @@ -61,21 +61,21 @@ func (t *RabbitTrigger) Validate(ctx context.Context) *apis.FieldError { return nil } - // if prefetch is set, validate it + // if parallelism is set, validate it // if it isn't then the default value (1) is used - prefetch, ok := t.GetAnnotations()[prefetchAnnotation] + parallelism, ok := t.GetAnnotations()[parallelismAnnotation] if ok { - prefetchInt, err := strconv.Atoi(prefetch) + parallelismInt, err := strconv.Atoi(parallelism) if err != nil { return &apis.FieldError{ - Message: "Failed to parse valid int from prefetchAnnotation", - Paths: []string{"metadata", "annotations", prefetchAnnotation}, + Message: "Failed to parse valid int from parallelismAnnotation", + Paths: []string{"metadata", "annotations", parallelismAnnotation}, Details: err.Error(), } } - if prefetchInt < 1 || prefetchInt > 1000 { - return apis.ErrOutOfBoundsValue(prefetchInt, 1, 1000, prefetchAnnotation) + if parallelismInt < 1 || parallelismInt > 1000 { + return apis.ErrOutOfBoundsValue(parallelismInt, 1, 1000, parallelismAnnotation) } } diff --git a/pkg/apis/eventing/v1/trigger_validation_test.go b/pkg/apis/eventing/v1/trigger_validation_test.go index c9e0c3abc4..c0f6eba1c2 100644 --- a/pkg/apis/eventing/v1/trigger_validation_test.go +++ b/pkg/apis/eventing/v1/trigger_validation_test.go @@ -30,7 +30,7 @@ import ( "knative.dev/pkg/apis" ) -const prefetchAnnotation = "rabbitmq.eventing.knative.dev/prefetchCount" +const parallelismAnnotation = "rabbitmq.eventing.knative.dev/parallelism" func TestTriggerValidate(t *testing.T) { tests := []struct { @@ -65,27 +65,27 @@ func TestTriggerValidate(t *testing.T) { }, }, { - name: "out of bounds prefetch count annotation", - trigger: trigger(withBroker("foo"), brokerExistsAndIsValid, withPrefetchCount("0")), - err: apis.ErrOutOfBoundsValue(0, 1, 1000, prefetchAnnotation), + name: "out of bounds parallelism count annotation", + trigger: trigger(withBroker("foo"), brokerExistsAndIsValid, withParallelism("0")), + err: apis.ErrOutOfBoundsValue(0, 1, 1000, parallelismAnnotation), }, { - name: "invalid prefetch count annotation", - trigger: trigger(withBroker("foo"), brokerExistsAndIsValid, withPrefetchCount("notAnumber")), + name: "invalid parallelism count annotation", + trigger: trigger(withBroker("foo"), brokerExistsAndIsValid, withParallelism("notAnumber")), err: &apis.FieldError{ - Message: "Failed to parse valid int from prefetchAnnotation", - Paths: []string{"metadata", "annotations", prefetchAnnotation}, + Message: "Failed to parse valid int from parallelismAnnotation", + Paths: []string{"metadata", "annotations", parallelismAnnotation}, Details: `strconv.Atoi: parsing "notAnumber": invalid syntax`, }, }, { - name: "update prefetch count annotation", - trigger: trigger(withBroker("foo"), brokerExistsAndIsValid, withPrefetchCount("100")), + name: "update parallelism count annotation", + trigger: trigger(withBroker("foo"), brokerExistsAndIsValid, withParallelism("100")), original: trigger(withBroker("foo"), brokerExistsAndIsValid), }, { - name: "valid prefetch count annotation", - trigger: trigger(withBroker("foo"), brokerExistsAndIsValid, withPrefetchCount("100")), + name: "valid Parallelisp count annotation", + trigger: trigger(withBroker("foo"), brokerExistsAndIsValid, withParallelism("100")), }, } for _, tc := range tests { @@ -156,12 +156,12 @@ func withFilters(filters ...[]string) triggerOpt { } } -func withPrefetchCount(prefetchCount string) triggerOpt { +func withParallelism(parallelism string) triggerOpt { return func(t *v1.RabbitTrigger) { if t.Annotations == nil { t.Annotations = map[string]string{} } - t.Annotations[prefetchAnnotation] = prefetchCount + t.Annotations[parallelismAnnotation] = parallelism } } diff --git a/pkg/apis/sources/v1alpha1/rabbitmq_defaults.go b/pkg/apis/sources/v1alpha1/rabbitmq_defaults.go index 4db41e6b1e..4dd82fa820 100644 --- a/pkg/apis/sources/v1alpha1/rabbitmq_defaults.go +++ b/pkg/apis/sources/v1alpha1/rabbitmq_defaults.go @@ -23,8 +23,8 @@ func (r *RabbitmqSource) SetDefaults(ctx context.Context) { } func (chConf *RabbitmqChannelConfigSpec) SetDefaults(ctx context.Context) { - if chConf.PrefetchCount == nil { - defaultPrefetchCount := 1 - chConf.PrefetchCount = &defaultPrefetchCount + if chConf.Parallelism == nil { + defaultParallelism := 1 + chConf.Parallelism = &defaultParallelism } } diff --git a/pkg/apis/sources/v1alpha1/rabbitmq_types.go b/pkg/apis/sources/v1alpha1/rabbitmq_types.go index e6537235ff..4042f9aead 100644 --- a/pkg/apis/sources/v1alpha1/rabbitmq_types.go +++ b/pkg/apis/sources/v1alpha1/rabbitmq_types.go @@ -55,11 +55,11 @@ var _ apis.Validatable = (*RabbitmqSource)(nil) var _ duckv1.KRShaped = (*RabbitmqSource)(nil) type RabbitmqChannelConfigSpec struct { - // Channel Prefetch count + // Sets the Channel's Prefetch count and number of Workers to consume simultaneously from it // +optional // +kubebuilder:validation:Minimum:=1 // +kubebuilder:validation:Maximum:=1000 - PrefetchCount *int `json:"prefetchCount,omitempty"` + Parallelism *int `json:"parallelism,omitempty"` // Channel Qos global property // +optional GlobalQos bool `json:"globalQos,omitempty"` diff --git a/pkg/apis/sources/v1alpha1/rabbitmq_validation.go b/pkg/apis/sources/v1alpha1/rabbitmq_validation.go index 49cce349cc..b6dd6f5882 100644 --- a/pkg/apis/sources/v1alpha1/rabbitmq_validation.go +++ b/pkg/apis/sources/v1alpha1/rabbitmq_validation.go @@ -30,7 +30,7 @@ func (current *RabbitmqSource) Validate(ctx context.Context) *apis.FieldError { var ignoreSpecFields cmp.Option original := apis.GetBaseline(ctx).(*RabbitmqSource) - // Channel Prefetch Count cannot be changed when exclusive Queues are been used + // Channel Parallelism cannot be changed when exclusive Queues are been used // because another Channel is created an it has no access to the exclusive Queue if !current.Spec.QueueConfig.Exclusive { ignoreSpecFields = cmpopts.IgnoreFields(RabbitmqSourceSpec{}, "ChannelConfig") @@ -55,12 +55,12 @@ func (current *RabbitmqSource) Validate(ctx context.Context) *apis.FieldError { } func (chSpec *RabbitmqChannelConfigSpec) validate(ctx context.Context) *apis.FieldError { - if chSpec.PrefetchCount == nil { + if chSpec.Parallelism == nil { return nil } - if *chSpec.PrefetchCount < 1 || *chSpec.PrefetchCount > 1000 { - return apis.ErrOutOfBoundsValue(*chSpec.PrefetchCount, 1, 1000, "prefetchCount") + if *chSpec.Parallelism < 1 || *chSpec.Parallelism > 1000 { + return apis.ErrOutOfBoundsValue(*chSpec.Parallelism, 1, 1000, "Parallelism") } return nil diff --git a/pkg/apis/sources/v1alpha1/rabbitmq_validation_test.go b/pkg/apis/sources/v1alpha1/rabbitmq_validation_test.go index 89f0857ec2..4a5546815d 100644 --- a/pkg/apis/sources/v1alpha1/rabbitmq_validation_test.go +++ b/pkg/apis/sources/v1alpha1/rabbitmq_validation_test.go @@ -25,8 +25,8 @@ import ( ) var ( - defaultPrefetchCount = 1 - fullSpec = RabbitmqSourceSpec{ + defaultParallelism = 1 + fullSpec = RabbitmqSourceSpec{ Brokers: "amqp://guest:guest@localhost:5672/", Topic: "logs_topic", ExchangeConfig: RabbitmqSourceExchangeConfigSpec{ @@ -45,8 +45,8 @@ var ( NoWait: false, }, ChannelConfig: RabbitmqChannelConfigSpec{ - PrefetchCount: &defaultPrefetchCount, - GlobalQos: false, + Parallelism: &defaultParallelism, + GlobalQos: false, }, Sink: &duckv1.Destination{ Ref: &duckv1.KReference{ @@ -196,48 +196,48 @@ func TestRabbitmqSourceCheckImmutableFields(t *testing.T) { } } -func TestRabbitmqSourceCheckChannelPrefetchCountValue(t *testing.T) { +func TestRabbitmqSourceCheckChannelParallelismValue(t *testing.T) { testCases := map[string]struct { spec *RabbitmqSourceSpec - prefetchCount int + parallelism int allowed, isInUpdate bool }{ "nil spec": { spec: nil, allowed: true, }, - "valid prefetch count": { - spec: &fullSpec, - prefetchCount: 1, - allowed: true, + "valid parallelism value": { + spec: &fullSpec, + parallelism: 1, + allowed: true, }, - "negative prefetchCount in spec": { - spec: &fullSpec, - prefetchCount: -1, - allowed: false, + "negative parallelism value in spec": { + spec: &fullSpec, + parallelism: -1, + allowed: false, }, - "out of bounds prefetchCount in spec": { - spec: &fullSpec, - prefetchCount: 1001, - allowed: false, + "out of bounds parallelism value in spec": { + spec: &fullSpec, + parallelism: 1001, + allowed: false, }, - "invalid update to prefetch count": { - spec: &fullSpec, - prefetchCount: 111, - isInUpdate: true, - allowed: false, + "invalid update to parallelism value": { + spec: &fullSpec, + parallelism: 111, + isInUpdate: true, + allowed: false, }, - "zero prefetchCount in spec on update": { - spec: &fullSpec, - prefetchCount: 0, - allowed: false, + "zero parallelism value in spec on update": { + spec: &fullSpec, + parallelism: 0, + allowed: false, }, - "out of bounds prefetchCount in spec on update": { - spec: &fullSpec, - prefetchCount: 1001, - allowed: false, + "out of bounds parallelism value in spec on update": { + spec: &fullSpec, + parallelism: 1001, + allowed: false, }, - "valid channel prefetchCount update on a non exclusive source queue": { + "valid channel parallelism value update on a non exclusive source queue": { spec: &RabbitmqSourceSpec{ Brokers: fullSpec.Brokers, Topic: fullSpec.Topic, @@ -254,9 +254,9 @@ func TestRabbitmqSourceCheckChannelPrefetchCountValue(t *testing.T) { Sink: fullSpec.Sink, ServiceAccountName: fullSpec.ServiceAccountName, }, - prefetchCount: 102, - allowed: true, - isInUpdate: true, + parallelism: 102, + allowed: true, + isInUpdate: true, }, } @@ -274,13 +274,13 @@ func TestRabbitmqSourceCheckChannelPrefetchCountValue(t *testing.T) { Spec: *tc.spec, } updated.Spec.ChannelConfig = RabbitmqChannelConfigSpec{ - PrefetchCount: &tc.prefetchCount, + Parallelism: &tc.parallelism, } ctx = apis.WithinUpdate(ctx, orig) err = updated.Validate(ctx) } else { orig.Spec.ChannelConfig = RabbitmqChannelConfigSpec{ - PrefetchCount: &tc.prefetchCount, + Parallelism: &tc.parallelism, } ctx = apis.WithinCreate(ctx) @@ -288,7 +288,7 @@ func TestRabbitmqSourceCheckChannelPrefetchCountValue(t *testing.T) { } if tc.allowed != (err == nil) { - t.Fatalf("Unexpected prefetch count value check. Expected %v. Actual %v", tc.allowed, err) + t.Fatalf("Unexpected parallelism value check. Expected %v. Actual %v", tc.allowed, err) } } }) diff --git a/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go index 7a60d3c3bb..4d92eab7f9 100644 --- a/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go @@ -30,8 +30,8 @@ import ( // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RabbitmqChannelConfigSpec) DeepCopyInto(out *RabbitmqChannelConfigSpec) { *out = *in - if in.PrefetchCount != nil { - in, out := &in.PrefetchCount, &out.PrefetchCount + if in.Parallelism != nil { + in, out := &in.Parallelism, &out.Parallelism *out = new(int) **out = **in } diff --git a/pkg/reconciler/source/resources/receive_adapter.go b/pkg/reconciler/source/resources/receive_adapter.go index 619b5ea77f..e80356b5ba 100644 --- a/pkg/reconciler/source/resources/receive_adapter.go +++ b/pkg/reconciler/source/resources/receive_adapter.go @@ -70,8 +70,8 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { Value: strconv.FormatBool(args.Source.Spec.ChannelConfig.GlobalQos), }, { - Name: "RABBITMQ_CHANNEL_CONFIG_PREFETCH_COUNT", - Value: strconv.Itoa(*args.Source.Spec.ChannelConfig.PrefetchCount), + Name: "RABBITMQ_CHANNEL_CONFIG_PARALLELISM", + Value: strconv.Itoa(*args.Source.Spec.ChannelConfig.Parallelism), }, { Name: "RABBITMQ_EXCHANGE_CONFIG_NAME", diff --git a/pkg/reconciler/source/resources/receive_adapter_test.go b/pkg/reconciler/source/resources/receive_adapter_test.go index 90101e7526..64f215d0ef 100644 --- a/pkg/reconciler/source/resources/receive_adapter_test.go +++ b/pkg/reconciler/source/resources/receive_adapter_test.go @@ -29,7 +29,7 @@ import ( func TestMakeReceiveAdapter(t *testing.T) { var retry int32 = 5 - prefetchCount := 10 + parallelism := 10 backoffDelay := "50ms" for _, tt := range []struct { @@ -90,7 +90,7 @@ func TestMakeReceiveAdapter(t *testing.T) { NoWait: false, }, ChannelConfig: v1alpha12.RabbitmqChannelConfigSpec{ - PrefetchCount: &prefetchCount, + Parallelism: ¶llelism, }, Retry: &retry, BackoffDelay: &backoffDelay, @@ -198,7 +198,7 @@ func TestMakeReceiveAdapter(t *testing.T) { Value: "false", }, { - Name: "RABBITMQ_CHANNEL_CONFIG_PREFETCH_COUNT", + Name: "RABBITMQ_CHANNEL_CONFIG_PARALLELISM", Value: "10", }, { diff --git a/pkg/reconciler/trigger/resources/dispatcher.go b/pkg/reconciler/trigger/resources/dispatcher.go index f64bd5d659..3c65a014b4 100644 --- a/pkg/reconciler/trigger/resources/dispatcher.go +++ b/pkg/reconciler/trigger/resources/dispatcher.go @@ -35,7 +35,7 @@ import ( const ( dispatcherContainerName = "dispatcher" - PrefetchAnnotation = "rabbitmq.eventing.knative.dev/prefetchCount" + ParallelismAnnotation = "rabbitmq.eventing.knative.dev/parallelism" ) // DispatcherArgs are the arguments to create a dispatcher deployment. There's @@ -117,11 +117,11 @@ func MakeDispatcherDeployment(args *DispatcherArgs) *appsv1.Deployment { }) } } - if prefetch, ok := args.Trigger.ObjectMeta.Annotations[PrefetchAnnotation]; ok { + if parallelism, ok := args.Trigger.ObjectMeta.Annotations[ParallelismAnnotation]; ok { dispatcher.Env = append(dispatcher.Env, corev1.EnvVar{ - Name: "PREFETCH_COUNT", - Value: prefetch, + Name: "PARALLELISM", + Value: parallelism, }) } return &appsv1.Deployment{ diff --git a/pkg/reconciler/trigger/resources/dispatcher_test.go b/pkg/reconciler/trigger/resources/dispatcher_test.go index f37ba1cef1..9b918cc274 100644 --- a/pkg/reconciler/trigger/resources/dispatcher_test.go +++ b/pkg/reconciler/trigger/resources/dispatcher_test.go @@ -73,9 +73,9 @@ func TestMakeDispatcherDeployment(t *testing.T) { want: deployment(deploymentNamed("testtrigger-dlx-dispatcher")), }, { - name: "with prefetch", - args: dispatcherArgs(withPrefetch("10")), - want: deployment(withEnv(corev1.EnvVar{Name: "PREFETCH_COUNT", Value: "10"})), + name: "with parallelism", + args: dispatcherArgs(withParallelism("10")), + want: deployment(withEnv(corev1.EnvVar{Name: "PARALLELISM", Value: "10"})), }, } for _, tt := range tests { @@ -205,12 +205,12 @@ func withDLX(args *DispatcherArgs) { args.DLX = true } -func withPrefetch(c string) func(*DispatcherArgs) { +func withParallelism(c string) func(*DispatcherArgs) { return func(args *DispatcherArgs) { if args.Trigger.ObjectMeta.Annotations == nil { - args.Trigger.ObjectMeta.Annotations = map[string]string{PrefetchAnnotation: c} + args.Trigger.ObjectMeta.Annotations = map[string]string{ParallelismAnnotation: c} } else { - args.Trigger.ObjectMeta.Annotations[PrefetchAnnotation] = c + args.Trigger.ObjectMeta.Annotations[ParallelismAnnotation] = c } } } diff --git a/pkg/reconciler/trigger/trigger_test.go b/pkg/reconciler/trigger/trigger_test.go index 382e35159b..8e6d1137ef 100644 --- a/pkg/reconciler/trigger/trigger_test.go +++ b/pkg/reconciler/trigger/trigger_test.go @@ -704,7 +704,7 @@ func TestReconcile(t *testing.T) { ), }}, }, { - Name: "Deployment updated when prefetch count removed", + Name: "Deployment updated when parallelism value is removed", Key: testKey, Objects: []runtime.Object{ ReadyBroker(), @@ -715,7 +715,7 @@ func TestReconcile(t *testing.T) { markReady(createQueue()), markReady(createPolicy(true)), markReady(createBinding(false)), - createDispatcherDeploymentWithPrefetchCount(), + createDispatcherDeploymentWithParallelism(), }, WantUpdates: []clientgotesting.UpdateActionImpl{{ Object: createDispatcherDeployment(), @@ -1014,7 +1014,7 @@ func createDifferentDispatcherDeployment() *appsv1.Deployment { return resources.MakeDispatcherDeployment(args) } -func createDispatcherDeploymentWithPrefetchCount() *appsv1.Deployment { +func createDispatcherDeploymentWithParallelism() *appsv1.Deployment { args := &resources.DispatcherArgs{ Trigger: &eventingv1.Trigger{ ObjectMeta: metav1.ObjectMeta{ @@ -1022,7 +1022,7 @@ func createDispatcherDeploymentWithPrefetchCount() *appsv1.Deployment { Namespace: testNS, UID: triggerUID, Annotations: map[string]string{ - resources.PrefetchAnnotation: "10", + resources.ParallelismAnnotation: "10", }, }, Spec: eventingv1.TriggerSpec{ diff --git a/samples/broker-trigger/600-trigger.yaml b/samples/broker-trigger/600-trigger.yaml index 8f53e160fb..2b5d68e133 100644 --- a/samples/broker-trigger/600-trigger.yaml +++ b/samples/broker-trigger/600-trigger.yaml @@ -7,7 +7,7 @@ metadata: # Value must be between 1 and 1000 # A value of 1 RabbitMQ Trigger behaves as a FIFO queue # Values above 1 break message ordering guarantees and can be seen as more performance oriented. - rabbitmq.eventing.knative.dev/prefetchCount: "10" + rabbitmq.eventing.knative.dev/parallelism: "10" spec: broker: default filter: diff --git a/samples/broker-trigger/README.md b/samples/broker-trigger/README.md index 8bfadfb07a..73287efb38 100644 --- a/samples/broker-trigger/README.md +++ b/samples/broker-trigger/README.md @@ -198,7 +198,7 @@ metadata: # Value must be between 1 and 1000 # A value of 1 RabbitMQ Trigger behaves as a FIFO queue # Values above 1 break message ordering guarantees and can be seen as more performance oriented. - # rabbitmq.eventing.knative.dev/prefetchCount: "10" + # rabbitmq.eventing.knative.dev/parallelism: "10" spec: broker: default filter: diff --git a/samples/source/500-source.yaml b/samples/source/500-source.yaml index e38f3ce694..81110a4049 100644 --- a/samples/source/500-source.yaml +++ b/samples/source/500-source.yaml @@ -18,7 +18,7 @@ spec: # Value must be between 1 and 1000 # A value of 1 RabbitMQ Source behaves as a FIFO queue # Values above 1 break message ordering guarantees and can be seen as more performance oriented - # prefetchCount: 10 + # parallelism: 10 globalQos: false exchangeConfig: name: "eventing-rabbitmq-source" @@ -36,7 +36,7 @@ spec: autoDelete: true # TODO: SHORT-TERM FIX # Ensures that the queue is deleted when the Source is deleted - # While exclusive is true, channelConfig.prefetchCount cannot be updated + # While exclusive is true, channelConfig.parallelism cannot be updated exclusive: true nowait: false sink: diff --git a/samples/trigger-customizations/400-trigger.yaml b/samples/trigger-customizations/400-trigger.yaml index 222e60f3a7..a8d8df6e81 100644 --- a/samples/trigger-customizations/400-trigger.yaml +++ b/samples/trigger-customizations/400-trigger.yaml @@ -21,7 +21,7 @@ metadata: # Value must be between 1 and 1000 # A value of 1 RabbitMQ Trigger behaves as a FIFO queue # Values above 1 break message ordering guarantees but can be seen as more performance oriented. - rabbitmq.eventing.knative.dev/prefetchCount: "10" + rabbitmq.eventing.knative.dev/parallelism: "10" spec: broker: default subscriber: diff --git a/samples/trigger-customizations/README.md b/samples/trigger-customizations/README.md index 29e5f5359a..697348c46c 100644 --- a/samples/trigger-customizations/README.md +++ b/samples/trigger-customizations/README.md @@ -7,13 +7,13 @@ - Set `KO_DOCKER_REPO` to an accessible container registry ## Overview -The following demo highlights the benefits and tradeoffs of setting the prefetch count to > 1 and leaving it as 1. +The following demo highlights the benefits and tradeoffs of setting the parallelism to > 1 and leaving it as 1. #### Configuration - Create a RabbitMQ cluster and broker - Create a source that will send 10 events all at once to a broker -- Create a first-in-first-out (FIFO) trigger with prefetch count unset (defaults to 1) -- Create a high-throughput trigger with prefetch count set to 10 +- Create a first-in-first-out (FIFO) trigger with parallelism unset (defaults to 1) +- Create a high-throughput trigger with parallelism set to 10 - Create a slow sink for each trigger - Observe the results through the logs of the sinks @@ -111,7 +111,7 @@ metadata: # Value must be between 1 and 1000 # A value of 1 RabbitMQ Trigger behaves as a FIFO queue # Values above 1 break message ordering guarantees but can be seen as more performance oriented. - rabbitmq.eventing.knative.dev/prefetchCount: "10" + rabbitmq.eventing.knative.dev/parallelism: "10" spec: broker: default subscriber: diff --git a/source/README.md b/source/README.md index 6cf20f4cad..5b2c6e49e5 100644 --- a/source/README.md +++ b/source/README.md @@ -179,7 +179,7 @@ Source parameters | `queueConfig.autoDelete` * | Boolean | | `queueConfig.exclusive` * | Boolean | | `queueConfig.nowait` * | Boolean | -| `channelConfig.prefetchCount` * | Int that limits the [Consumer Prefetch Value](https://www.rabbitmq.com/consumer-prefetch.html). Default value is `1`. Value must be between `1` and `1000`. With a value of `1` the RabbitMQ Source process events in FIFO order, values above `1` break message ordering guarantees and can be seen as more performance oriented. | +| `channelConfig.parallelism` * | Int that sets the [Consumer Prefetch Value](https://www.rabbitmq.com/consumer-prefetch.html) and creates `n` parallel consumer processes. Default value is `1`. Value must be between `1` and `1000`. With a value of `1` the RabbitMQ Source process events in FIFO order, values above `1` break message ordering guarantees and can be seen as more performance oriented. | | `channelConfig.globalQos` * | Boolean defining how the [Consumer Sharing Limit](https://www.rabbitmq.com/consumer-prefetch.html#sharing-the-limit) is handled. | | `sink` | A reference to an [Addressable](https://knative.dev/docs/eventing/#event-consumers) Kubernetes object | @@ -240,7 +240,7 @@ The Source will provide output information about readiness or errors via the the network for consumers before receiving delivery acks. The intent of Qos is to make sure the network buffers stay full between the server and client. - 2. With a prefetch count greater than zero, the server will deliver that many + 2. With a parallelism greater than zero, the server will deliver that many messages to consumers before acknowledgments are received. The server ignores this option when consumers are started with noAck because no acknowledgments are expected or sent. @@ -255,12 +255,12 @@ The Source will provide output information about readiness or errors via the channels, not connections (https://www.rabbitmq.com/consumer-prefetch.html). 5. To get round-robin behavior between consumers consuming from the same queue on - different connections, set the prefetch count to 1, and the next available + different connections, set the parallelism to 1, and the next available message on the server will be delivered to the next available consumer. 6. If your consumer work time is reasonably consistent and not much greater than two times your network round trip time, you will see significant - throughput improvements starting with a prefetch count of 2 or slightly + throughput improvements starting with a prallelism of 2 or slightly greater as described by benchmarks on RabbitMQ. 7. http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/ diff --git a/test/e2e/config/brokertrigger/brokertrigger.go b/test/e2e/config/brokertrigger/brokertrigger.go index 12b789114e..fdcca07d8b 100644 --- a/test/e2e/config/brokertrigger/brokertrigger.go +++ b/test/e2e/config/brokertrigger/brokertrigger.go @@ -30,19 +30,19 @@ func init() { } type Topology struct { - MessageCount, PrefetchCount int - Triggers []duckv1.KReference + MessageCount, Parallelism int + Triggers []duckv1.KReference } func Install(topology Topology) feature.StepFn { - if topology.PrefetchCount == 0 { - topology.PrefetchCount = 1 + if topology.Parallelism == 0 { + topology.Parallelism = 1 } args := map[string]interface{}{ - "messageCount": topology.MessageCount, - "triggers": topology.Triggers, - "prefetchCount": topology.PrefetchCount, + "messageCount": topology.MessageCount, + "triggers": topology.Triggers, + "Parallelism": topology.Parallelism, } return func(ctx context.Context, t feature.T) { if _, err := manifest.InstallLocalYaml(ctx, args); err != nil { diff --git a/test/e2e/config/brokertrigger/trigger.yaml b/test/e2e/config/brokertrigger/trigger.yaml index c4a96bc552..4f5ee8b237 100644 --- a/test/e2e/config/brokertrigger/trigger.yaml +++ b/test/e2e/config/brokertrigger/trigger.yaml @@ -6,7 +6,7 @@ metadata: name: {{ printf "%s-%d" $kref.Name $index }} namespace: {{ $.namespace }} annotations: - rabbitmq.eventing.knative.dev/prefetchCount: '{{ $.prefetchCount }}' + rabbitmq.eventing.knative.dev/parallelism: '{{ $.Parallelism }}' spec: broker: testbroker subscriber: diff --git a/test/e2e/config/source/rabbitmq-source.yaml b/test/e2e/config/source/rabbitmq-source.yaml index 68df280ba7..949f873d43 100644 --- a/test/e2e/config/source/rabbitmq-source.yaml +++ b/test/e2e/config/source/rabbitmq-source.yaml @@ -45,7 +45,7 @@ spec: exclusive: false nowait: false channelConfig: - prefetchCount: 10 + parallelism: 10 sink: ref: apiVersion: v1 diff --git a/test/e2e/dispatcher_test.go b/test/e2e/dispatcher_test.go index 0120671b06..83d765c7b0 100644 --- a/test/e2e/dispatcher_test.go +++ b/test/e2e/dispatcher_test.go @@ -41,8 +41,8 @@ func ConcurrentDispatchTest() *feature.Feature { f.Setup("install recorder", eventshub.Install("recorder", eventshub.StartReceiver, eventshub.ResponseWaitTime(time.Second))) f.Setup("install test resources", brokertrigger.Install(brokertrigger.Topology{ - MessageCount: 2, - PrefetchCount: 10, + MessageCount: 2, + Parallelism: 10, Triggers: []duckv1.KReference{{ Kind: "Service", Name: "recorder", diff --git a/test/performance/broker-rmq/100-broker-perf-setup.yaml b/test/performance/broker-rmq/100-broker-perf-setup.yaml index 41d10a3a3a..40bd154373 100644 --- a/test/performance/broker-rmq/100-broker-perf-setup.yaml +++ b/test/performance/broker-rmq/100-broker-perf-setup.yaml @@ -113,7 +113,7 @@ metadata: name: rabbitmq-broker-perf namespace: perf-eventing annotations: - rabbitmq.eventing.knative.dev/prefetchCount: "80" + rabbitmq.eventing.knative.dev/parallelism: "80" spec: broker: rabbitmq-test-broker subscriber: