diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index abe6bedfb1..4eb8029a67 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -242,6 +242,7 @@ func (r *Reconciler) reconcileDLXDispatcherDeployment(ctx context.Context, b *ev Broker: b, Image: r.dispatcherImage, //ServiceAccountName string + Delivery: b.Spec.Delivery, RabbitMQSecretName: resources.SecretName(b.Name), QueueName: naming.CreateBrokerDeadLetterQueueName(b), BrokerUrlSecretKey: resources.BrokerURLSecretKey, diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go index dda436601e..8345fca741 100644 --- a/pkg/reconciler/broker/broker_test.go +++ b/pkg/reconciler/broker/broker_test.go @@ -1224,6 +1224,7 @@ func createDispatcherDeployment() *appsv1.Deployment { } args := &resources.DispatcherArgs{ Broker: broker, + Delivery: delivery, Image: dispatcherImage, RabbitMQSecretName: rabbitBrokerSecretName, QueueName: "b.test-namespace.test-broker.dlq.broker-test-uid", diff --git a/pkg/reconciler/broker/resources/dispatcher.go b/pkg/reconciler/broker/resources/dispatcher.go index 229d2f35b0..5dffa3f08c 100644 --- a/pkg/reconciler/broker/resources/dispatcher.go +++ b/pkg/reconciler/broker/resources/dispatcher.go @@ -23,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/eventing" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" reconcilersource "knative.dev/eventing/pkg/reconciler/source" @@ -39,8 +40,9 @@ const ( // DispatcherArgs are the arguments to create a Broker's Dispatcher Deployment that handles // DeadLetterSink deliveries. type DispatcherArgs struct { - Broker *eventingv1.Broker - Image string + Delivery *eventingduckv1.DeliverySpec + Broker *eventingv1.Broker + Image string //ServiceAccountName string RabbitMQHost string RabbitMQSecretName string @@ -88,6 +90,36 @@ func MakeDispatcherDeployment(args *DispatcherArgs) *appsv1.Deployment { if args.Configs != nil { envs = append(envs, args.Configs.ToEnvVars()...) } + if args.Delivery != nil { + if args.Delivery.Retry != nil { + envs = append(envs, + corev1.EnvVar{ + Name: "RETRY", + Value: fmt.Sprint(*args.Delivery.Retry), + }) + + } else { + envs = append(envs, + corev1.EnvVar{ + Name: "RETRY", + Value: "5", + }) + } + if args.Delivery.BackoffPolicy != nil { + envs = append(envs, + corev1.EnvVar{ + Name: "BACKOFF_POLICY", + Value: string(*args.Delivery.BackoffPolicy), + }) + } + if args.Delivery.BackoffDelay != nil { + envs = append(envs, + corev1.EnvVar{ + Name: "BACKOFF_DELAY", + Value: *args.Delivery.BackoffDelay, + }) + } + } return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: args.Broker.Namespace, diff --git a/pkg/reconciler/broker/resources/dispatcher_test.go b/pkg/reconciler/broker/resources/dispatcher_test.go index 753453666e..069fca484e 100644 --- a/pkg/reconciler/broker/resources/dispatcher_test.go +++ b/pkg/reconciler/broker/resources/dispatcher_test.go @@ -23,8 +23,10 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "knative.dev/eventing/pkg/apis/duck/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/pkg/apis" + "knative.dev/pkg/ptr" "knative.dev/pkg/system" _ "knative.dev/pkg/system/testing" @@ -49,6 +51,7 @@ func TestMakeDispatcherDeployment(t *testing.T) { broker := &eventingv1.Broker{ ObjectMeta: metav1.ObjectMeta{Name: brokerName, Namespace: ns}, } + linear := v1.BackoffPolicyLinear args := &DispatcherArgs{ Broker: broker, Image: image, @@ -58,6 +61,11 @@ func TestMakeDispatcherDeployment(t *testing.T) { BrokerUrlSecretKey: brokerURLKey, Subscriber: sURL, BrokerIngressURL: bURL, + Delivery: &v1.DeliverySpec{ + Retry: ptr.Int32(10), + BackoffDelay: ptr.String("20s"), + BackoffPolicy: &linear, + }, } got := MakeDispatcherDeployment(args) @@ -122,6 +130,15 @@ func TestMakeDispatcherDeployment(t *testing.T) { }, { Name: "BROKER_INGRESS_URL", Value: brokerIngressURL, + }, { + Name: "RETRY", + Value: "10", + }, { + Name: "BACKOFF_POLICY", + Value: "linear", + }, { + Name: "BACKOFF_DELAY", + Value: "20s", }}, }}, }, diff --git a/pkg/reconciler/trigger/resources/dispatcher.go b/pkg/reconciler/trigger/resources/dispatcher.go index 3c65a014b4..8007d1d8b8 100644 --- a/pkg/reconciler/trigger/resources/dispatcher.go +++ b/pkg/reconciler/trigger/resources/dispatcher.go @@ -116,6 +116,13 @@ func MakeDispatcherDeployment(args *DispatcherArgs) *appsv1.Deployment { Value: string(*args.Delivery.BackoffPolicy), }) } + if args.Delivery.BackoffDelay != nil { + dispatcher.Env = append(dispatcher.Env, + corev1.EnvVar{ + Name: "BACKOFF_DELAY", + Value: *args.Delivery.BackoffDelay, + }) + } } if parallelism, ok := args.Trigger.ObjectMeta.Annotations[ParallelismAnnotation]; ok { dispatcher.Env = append(dispatcher.Env, diff --git a/pkg/reconciler/trigger/resources/dispatcher_test.go b/pkg/reconciler/trigger/resources/dispatcher_test.go index 9b918cc274..10c189e08f 100644 --- a/pkg/reconciler/trigger/resources/dispatcher_test.go +++ b/pkg/reconciler/trigger/resources/dispatcher_test.go @@ -26,6 +26,7 @@ import ( eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/pkg/apis" + "knative.dev/pkg/ptr" "knative.dev/pkg/system" _ "knative.dev/pkg/system/testing" ) @@ -57,14 +58,16 @@ func TestMakeDispatcherDeployment(t *testing.T) { want: deployment(), }, { - name: "with retry and backoff", + name: "with delivery spec", args: dispatcherArgs(withDelivery(&eventingduckv1.DeliverySpec{ Retry: Int32Ptr(10), BackoffPolicy: &exponentialBackoff, + BackoffDelay: ptr.String("20s"), })), want: deployment( withEnv(corev1.EnvVar{Name: "RETRY", Value: "10"}), withEnv(corev1.EnvVar{Name: "BACKOFF_POLICY", Value: "exponential"}), + withEnv(corev1.EnvVar{Name: "BACKOFF_DELAY", Value: "20s"}), ), }, { diff --git a/pkg/reconciler/trigger/trigger.go b/pkg/reconciler/trigger/trigger.go index 664ba201a8..dae91ee191 100644 --- a/pkg/reconciler/trigger/trigger.go +++ b/pkg/reconciler/trigger/trigger.go @@ -209,7 +209,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) p } t.Status.MarkDeadLetterSinkResolvedSucceeded() t.Status.DeadLetterSinkURI = deadLetterSinkURI - _, err = r.reconcileDLXDispatcherDeployment(ctx, t, deadLetterSinkURI) + _, err = r.reconcileDispatcherDeployment(ctx, t, deadLetterSinkURI, t.Spec.Delivery, true) if err != nil { logging.FromContext(ctx).Error("Problem reconciling DLX dispatcher Deployment", zap.Error(err)) t.Status.MarkDependencyFailed("DeploymentFailure", "%v", err) @@ -293,7 +293,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) p delivery = broker.Spec.Delivery } - _, err = r.reconcileDispatcherDeployment(ctx, t, subscriberURI, delivery) + _, err = r.reconcileDispatcherDeployment(ctx, t, subscriberURI, delivery, false) if err != nil { logging.FromContext(ctx).Error("Problem reconciling dispatcher Deployment", zap.Error(err)) t.Status.MarkDependencyFailed("DeploymentFailure", "%v", err) @@ -328,7 +328,7 @@ func (r *Reconciler) reconcileDeployment(ctx context.Context, d *v1.Deployment) } // reconcileDispatcherDeployment reconciles Trigger's dispatcher deployment. -func (r *Reconciler) reconcileDispatcherDeployment(ctx context.Context, t *eventingv1.Trigger, sub *apis.URL, delivery *eventingduckv1.DeliverySpec) (*v1.Deployment, error) { +func (r *Reconciler) reconcileDispatcherDeployment(ctx context.Context, t *eventingv1.Trigger, sub *apis.URL, delivery *eventingduckv1.DeliverySpec, dlq bool) (*v1.Deployment, error) { rabbitmqSecret, err := r.getRabbitmqSecret(ctx, t) if err != nil { return nil, err @@ -337,39 +337,23 @@ func (r *Reconciler) reconcileDispatcherDeployment(ctx context.Context, t *event if err != nil { return nil, err } - expected := resources.MakeDispatcherDeployment(&resources.DispatcherArgs{ - Trigger: t, - Image: r.dispatcherImage, - RabbitMQSecretName: rabbitmqSecret.Name, - QueueName: naming.CreateTriggerQueueName(t), - BrokerUrlSecretKey: brokerresources.BrokerURLSecretKey, - BrokerIngressURL: b.Status.Address.URL, - Subscriber: sub, - Delivery: delivery, - Configs: r.configs, - }) - return r.reconcileDeployment(ctx, expected) -} -// reconcileDispatcherDeployment reconciles Trigger's dispatcher deployment. -func (r *Reconciler) reconcileDLXDispatcherDeployment(ctx context.Context, t *eventingv1.Trigger, sub *apis.URL) (*v1.Deployment, error) { - rabbitmqSecret, err := r.getRabbitmqSecret(ctx, t) - if err != nil { - return nil, err - } - b, err := r.brokerLister.Brokers(t.Namespace).Get(t.Spec.Broker) - if err != nil { - return nil, err + queueName := naming.CreateTriggerQueueName(t) + if dlq { + // overwrite to a dlq queueName if it's a dlq + queueName = naming.CreateTriggerDeadLetterQueueName(t) } + expected := resources.MakeDispatcherDeployment(&resources.DispatcherArgs{ Trigger: t, Image: r.dispatcherImage, RabbitMQSecretName: rabbitmqSecret.Name, - QueueName: naming.CreateTriggerDeadLetterQueueName(t), + QueueName: queueName, BrokerUrlSecretKey: brokerresources.BrokerURLSecretKey, BrokerIngressURL: b.Status.Address.URL, Subscriber: sub, - DLX: true, + DLX: dlq, + Delivery: delivery, Configs: r.configs, }) return r.reconcileDeployment(ctx, expected)