Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better naming for prefetchCount env var #676

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions broker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ install
[KEDA based autoscaler](https://github.com/knative-sandbox/eventing-autoscaler-keda).

## Trigger Pre-Fetch Count
Trigger has a configurable annotation `rabbitmq.eventing.knative.dev/prefetchCount`. The following are effects of setting this parameter to `n`:
Trigger has a configurable annotation `rabbitmq.eventing.knative.dev/parallelism`. The following are effects of setting this parameter to `n`:

- Prefetch count is set on the RabbitMQ channel and queue created for this trigger. The channel will receive a maximum of `n` number of messages at once.
- Prefetch count is set to this value on the RabbitMQ channel and queue created for this trigger. The channel will receive a maximum of `n` number of messages at once.
- The trigger will create `n` workers to consume messages off the queue and dispatch to the sink.

If this value is unset, it will default to `1`. This means the trigger will only handle one event at a time. This will preserve the order of the messages in a queue but
Expand Down
76 changes: 49 additions & 27 deletions cmd/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"go.uber.org/zap"

"github.com/NeowayLabs/wabbit"
"github.com/NeowayLabs/wabbit/amqp"
"github.com/kelseyhightower/envconfig"
amqperr "github.com/rabbitmq/amqp091-go"
Expand All @@ -43,10 +44,13 @@ type envConfig struct {
SubscriberURL string `envconfig:"SUBSCRIBER" required:"true"`

// Number of concurrent messages in flight
PrefetchCount int `envconfig:"PREFETCH_COUNT" default:"1" required:"false"`
Parallelism int `envconfig:"PARALLELISM" default:"1" required:"false"`
Retry int `envconfig:"RETRY" required:"false"`
BackoffPolicy string `envconfig:"BACKOFF_POLICY" required:"false"`
BackoffDelay time.Duration `envconfig:"BACKOFF_DELAY" default:"50ms" required:"false"`

connection *amqp.Conn
channel wabbit.Channel
}

func main() {
Expand Down Expand Up @@ -79,50 +83,68 @@ func main() {
backoffDelay := env.BackoffDelay
logging.FromContext(ctx).Infow("Setting BackoffDelay", zap.Any("backoffDelay", backoffDelay))

conn, err := amqp.Dial(env.RabbitURL)
if err != nil {
logging.FromContext(ctx).Fatal("Failed to connect to RabbitMQ: ", err)
}
env.setupRabbitMQ(ctx)
defer func() {
err = conn.Close()
err := env.connection.Close()
if err != nil && !errors.Is(err, amqperr.ErrClosed) {
logging.FromContext(ctx).Warn("Failed to close connection: ", err)
}
}()

channel, err := conn.Channel()
if err != nil {
logging.FromContext(ctx).Fatal("Failed to open a channel: ", err)
}
defer func() {
err = channel.Close()
err := env.channel.Close()
if err != nil && !errors.Is(err, amqperr.ErrClosed) {
logging.FromContext(ctx).Warn("Failed to close channel: ", err)
}
}()

err = channel.Qos(
env.PrefetchCount, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
logging.FromContext(ctx).Fatal("Failed to create QoS: ", err)
}

d := &dispatcher.Dispatcher{
BrokerIngressURL: env.BrokerIngressURL,
SubscriberURL: env.SubscriberURL,
MaxRetries: env.Retry,
BackoffDelay: backoffDelay,
BackoffPolicy: backoffPolicy,
WorkerCount: env.PrefetchCount,
WorkerCount: env.Parallelism,
}
if err := d.ConsumeFromQueue(ctx, channel, env.QueueName); err != nil {
// ignore ctx cancelled and channel closed errors
if errors.Is(err, context.Canceled) || errors.Is(err, amqperr.ErrClosed) {
return

for {
if err := d.ConsumeFromQueue(ctx, env.channel, env.QueueName); err != nil {
// ignore ctx cancelled and channel closed errors
if errors.Is(err, amqperr.ErrClosed) {
env.setupRabbitMQ(ctx)
continue
}

if errors.Is(err, context.Canceled) {
return
}

logging.FromContext(ctx).Fatal("Failed to consume from queue: ", err)
break
}
logging.FromContext(ctx).Fatal("Failed to consume from queue: ", err)
}
}

func (env *envConfig) setupRabbitMQ(ctx context.Context) {
var err error

if env.connection == nil || env.connection.IsClosed() {
env.connection, err = amqp.Dial(env.RabbitURL)
if err != nil {
logging.FromContext(ctx).Fatal("Failed to connect to RabbitMQ: ", err)
}
}

env.channel, err = env.connection.Channel()
if err != nil {
logging.FromContext(ctx).Fatal("Failed to open a channel: ", err)
}

err = env.channel.Qos(
env.Parallelism, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
logging.FromContext(ctx).Fatal("Failed to create QoS: ", err)
}
}
4 changes: 2 additions & 2 deletions config/source/300-rabbitmqsource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ spec:
globalQos:
description: Channel Qos global property
type: boolean
prefetchCount:
description: Channel Prefetch count
parallelism:
description: Sets the Channel's Prefetch count and number of Workers to consume simultaneously from it
maximum: 1000
minimum: 1
type: integer
Expand Down
10 changes: 5 additions & 5 deletions pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type ExchangeConfig struct {
}

type ChannelConfig struct {
PrefetchCount int `envconfig:"RABBITMQ_CHANNEL_CONFIG_PREFETCH_COUNT" default:"1" required:"false"`
GlobalQos bool `envconfig:"RABBITMQ_CHANNEL_CONFIG_QOS_GLOBAL" required:"false"`
Parallelism int `envconfig:"RABBITMQ_CHANNEL_CONFIG_PARALLELISM" default:"1" required:"false"`
GlobalQos bool `envconfig:"RABBITMQ_CHANNEL_CONFIG_QOS_GLOBAL" required:"false"`
}

type QueueConfig struct {
Expand Down Expand Up @@ -158,12 +158,12 @@ func (a *Adapter) CreateChannel(conn *amqp.Conn, connTest *amqptest.Conn,
}

logger.Info("Initializing Channel with Config: ",
zap.Int("PrefetchCount", a.config.ChannelConfig.PrefetchCount),
zap.Int("Parallelism", a.config.ChannelConfig.Parallelism),
zap.Bool("GlobalQoS", a.config.ChannelConfig.GlobalQos),
)

err = ch.Qos(
a.config.ChannelConfig.PrefetchCount,
a.config.ChannelConfig.Parallelism,
0,
a.config.ChannelConfig.GlobalQos,
)
Expand Down Expand Up @@ -232,7 +232,7 @@ func (a *Adapter) PollForMessages(channel *wabbit.Channel,
msgs, _ := a.ConsumeMessages(channel, queue, logger)

wg := &sync.WaitGroup{}
workerCount := a.config.ChannelConfig.PrefetchCount
workerCount := a.config.ChannelConfig.Parallelism
wg.Add(workerCount)
workerQueue := make(chan wabbit.Delivery, workerCount)
logger.Info("Starting GoRoutines Workers: ", zap.Int("WorkerCount", workerCount))
Expand Down
18 changes: 9 additions & 9 deletions pkg/apis/eventing/v1/trigger_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
)

const (
BrokerClass = "RabbitMQBroker"
prefetchAnnotation = "rabbitmq.eventing.knative.dev/prefetchCount"
BrokerClass = "RabbitMQBroker"
parallelismAnnotation = "rabbitmq.eventing.knative.dev/parallelism"
)

func ValidateTrigger(ctx context.Context) func(context.Context, *unstructured.Unstructured) error {
Expand Down Expand Up @@ -61,21 +61,21 @@ func (t *RabbitTrigger) Validate(ctx context.Context) *apis.FieldError {
return nil
}

// if prefetch is set, validate it
// if parallelism is set, validate it
// if it isn't then the default value (1) is used
prefetch, ok := t.GetAnnotations()[prefetchAnnotation]
parallelism, ok := t.GetAnnotations()[parallelismAnnotation]
if ok {
prefetchInt, err := strconv.Atoi(prefetch)
parallelismInt, err := strconv.Atoi(parallelism)
if err != nil {
return &apis.FieldError{
Message: "Failed to parse valid int from prefetchAnnotation",
Paths: []string{"metadata", "annotations", prefetchAnnotation},
Message: "Failed to parse valid int from parallelismAnnotation",
Paths: []string{"metadata", "annotations", parallelismAnnotation},
Details: err.Error(),
}
}

if prefetchInt < 1 || prefetchInt > 1000 {
return apis.ErrOutOfBoundsValue(prefetchInt, 1, 1000, prefetchAnnotation)
if parallelismInt < 1 || parallelismInt > 1000 {
return apis.ErrOutOfBoundsValue(parallelismInt, 1, 1000, parallelismAnnotation)
}
}

Expand Down
28 changes: 14 additions & 14 deletions pkg/apis/eventing/v1/trigger_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"knative.dev/pkg/apis"
)

const prefetchAnnotation = "rabbitmq.eventing.knative.dev/prefetchCount"
const parallelismAnnotation = "rabbitmq.eventing.knative.dev/parallelism"

func TestTriggerValidate(t *testing.T) {
tests := []struct {
Expand Down Expand Up @@ -65,27 +65,27 @@ func TestTriggerValidate(t *testing.T) {
},
},
{
name: "out of bounds prefetch count annotation",
trigger: trigger(withBroker("foo"), brokerExistsAndIsValid, withPrefetchCount("0")),
err: apis.ErrOutOfBoundsValue(0, 1, 1000, prefetchAnnotation),
name: "out of bounds parallelism count annotation",
trigger: trigger(withBroker("foo"), brokerExistsAndIsValid, withParallelism("0")),
err: apis.ErrOutOfBoundsValue(0, 1, 1000, parallelismAnnotation),
},
{
name: "invalid prefetch count annotation",
trigger: trigger(withBroker("foo"), brokerExistsAndIsValid, withPrefetchCount("notAnumber")),
name: "invalid parallelism count annotation",
trigger: trigger(withBroker("foo"), brokerExistsAndIsValid, withParallelism("notAnumber")),
err: &apis.FieldError{
Message: "Failed to parse valid int from prefetchAnnotation",
Paths: []string{"metadata", "annotations", prefetchAnnotation},
Message: "Failed to parse valid int from parallelismAnnotation",
Paths: []string{"metadata", "annotations", parallelismAnnotation},
Details: `strconv.Atoi: parsing "notAnumber": invalid syntax`,
},
},
{
name: "update prefetch count annotation",
trigger: trigger(withBroker("foo"), brokerExistsAndIsValid, withPrefetchCount("100")),
name: "update parallelism count annotation",
trigger: trigger(withBroker("foo"), brokerExistsAndIsValid, withParallelism("100")),
original: trigger(withBroker("foo"), brokerExistsAndIsValid),
},
{
name: "valid prefetch count annotation",
trigger: trigger(withBroker("foo"), brokerExistsAndIsValid, withPrefetchCount("100")),
name: "valid Parallelisp count annotation",
trigger: trigger(withBroker("foo"), brokerExistsAndIsValid, withParallelism("100")),
},
}
for _, tc := range tests {
Expand Down Expand Up @@ -156,12 +156,12 @@ func withFilters(filters ...[]string) triggerOpt {
}
}

func withPrefetchCount(prefetchCount string) triggerOpt {
func withParallelism(parallelism string) triggerOpt {
return func(t *v1.RabbitTrigger) {
if t.Annotations == nil {
t.Annotations = map[string]string{}
}

t.Annotations[prefetchAnnotation] = prefetchCount
t.Annotations[parallelismAnnotation] = parallelism
}
}
6 changes: 3 additions & 3 deletions pkg/apis/sources/v1alpha1/rabbitmq_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func (r *RabbitmqSource) SetDefaults(ctx context.Context) {
}

func (chConf *RabbitmqChannelConfigSpec) SetDefaults(ctx context.Context) {
if chConf.PrefetchCount == nil {
defaultPrefetchCount := 1
chConf.PrefetchCount = &defaultPrefetchCount
if chConf.Parallelism == nil {
defaultParallelism := 1
chConf.Parallelism = &defaultParallelism
}
}
4 changes: 2 additions & 2 deletions pkg/apis/sources/v1alpha1/rabbitmq_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ var _ apis.Validatable = (*RabbitmqSource)(nil)
var _ duckv1.KRShaped = (*RabbitmqSource)(nil)

type RabbitmqChannelConfigSpec struct {
// Channel Prefetch count
// Sets the Channel's Prefetch count and number of Workers to consume simultaneously from it
// +optional
// +kubebuilder:validation:Minimum:=1
// +kubebuilder:validation:Maximum:=1000
PrefetchCount *int `json:"prefetchCount,omitempty"`
Parallelism *int `json:"parallelism,omitempty"`
// Channel Qos global property
// +optional
GlobalQos bool `json:"globalQos,omitempty"`
Expand Down
8 changes: 4 additions & 4 deletions pkg/apis/sources/v1alpha1/rabbitmq_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (current *RabbitmqSource) Validate(ctx context.Context) *apis.FieldError {
var ignoreSpecFields cmp.Option
original := apis.GetBaseline(ctx).(*RabbitmqSource)

// Channel Prefetch Count cannot be changed when exclusive Queues are been used
// Channel Parallelism cannot be changed when exclusive Queues are been used
// because another Channel is created an it has no access to the exclusive Queue
if !current.Spec.QueueConfig.Exclusive {
ignoreSpecFields = cmpopts.IgnoreFields(RabbitmqSourceSpec{}, "ChannelConfig")
Expand All @@ -55,12 +55,12 @@ func (current *RabbitmqSource) Validate(ctx context.Context) *apis.FieldError {
}

func (chSpec *RabbitmqChannelConfigSpec) validate(ctx context.Context) *apis.FieldError {
if chSpec.PrefetchCount == nil {
if chSpec.Parallelism == nil {
return nil
}

if *chSpec.PrefetchCount < 1 || *chSpec.PrefetchCount > 1000 {
return apis.ErrOutOfBoundsValue(*chSpec.PrefetchCount, 1, 1000, "prefetchCount")
if *chSpec.Parallelism < 1 || *chSpec.Parallelism > 1000 {
return apis.ErrOutOfBoundsValue(*chSpec.Parallelism, 1, 1000, "Parallelism")
}

return nil
Expand Down
Loading