diff --git a/pkg/messaging/nats/nats.go b/pkg/messaging/nats/nats.go index d8aa316..b380d86 100644 --- a/pkg/messaging/nats/nats.go +++ b/pkg/messaging/nats/nats.go @@ -397,10 +397,14 @@ func mergeGlobalAndSubscriptionOptions(globalOptions options, subscriptionOption } if subscriptionOptions.QGroup != nil { - if *subscriptionOptions.QGroup == false { + if *subscriptionOptions.QGroup { + if mergedOptions.natsQueueGroupName == "" { + return mergedOptions, errors.New("nats-streaming error: missing queue group name") + } + mergedOptions.subscriptionType = subscriptionTypeQueueGroup + } else { mergedOptions.natsQueueGroupName = "" - } else if mergedOptions.natsQueueGroupName == "" { - return mergedOptions, errors.New("nats-streaming error: missing queue group name") + mergedOptions.subscriptionType = subscriptionTypeTopic } } if subscriptionOptions.MaxConcurrentMessages != nil { diff --git a/pkg/messaging/nats/nats_test.go b/pkg/messaging/nats/nats_test.go index 8e144ed..3ae6e7a 100644 --- a/pkg/messaging/nats/nats_test.go +++ b/pkg/messaging/nats/nats_test.go @@ -412,10 +412,13 @@ func TestMergeGlobalAndSubscriptionOptions(t *testing.T) { {"test subscription options with qGroup false", args{globalSubscriptionOptions, &messaging.SubscriptionOptions{QGroup: &qGroupFalse}}, func() options { result := globalSubscriptionOptions result.natsQueueGroupName = "" + result.subscriptionType = subscriptionTypeTopic return result }, false}, {"test subscription options with qGroup true and global options qGroupName set", args{globalSubscriptionOptions, &messaging.SubscriptionOptions{QGroup: &qGroupTrue}}, func() options { - return globalSubscriptionOptions + result := globalSubscriptionOptions + result.subscriptionType = subscriptionTypeQueueGroup + return result }, false}, {"test subscription options with qGroup true and global options qGroupName not set", args{options{natsQueueGroupName: ""}, &messaging.SubscriptionOptions{QGroup: &qGroupTrue}}, func() options { return options{}