Skip to content

Commit

Permalink
fix subscription without qgroup (#38)
Browse files Browse the repository at this point in the history
Co-authored-by: Radu Popovici <rpopovici@totalsoft.ro>
  • Loading branch information
oncicaradupopovici and Radu Popovici authored Aug 4, 2022
1 parent ec04090 commit e0f9bf3
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
10 changes: 7 additions & 3 deletions pkg/messaging/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,14 @@ func mergeGlobalAndSubscriptionOptions(globalOptions options, subscriptionOption
}

if subscriptionOptions.QGroup != nil {
if *subscriptionOptions.QGroup == false {
if *subscriptionOptions.QGroup {
if mergedOptions.natsQueueGroupName == "" {
return mergedOptions, errors.New("nats-streaming error: missing queue group name")
}
mergedOptions.subscriptionType = subscriptionTypeQueueGroup
} else {
mergedOptions.natsQueueGroupName = ""
} else if mergedOptions.natsQueueGroupName == "" {
return mergedOptions, errors.New("nats-streaming error: missing queue group name")
mergedOptions.subscriptionType = subscriptionTypeTopic
}
}
if subscriptionOptions.MaxConcurrentMessages != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/messaging/nats/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,10 +412,13 @@ func TestMergeGlobalAndSubscriptionOptions(t *testing.T) {
{"test subscription options with qGroup false", args{globalSubscriptionOptions, &messaging.SubscriptionOptions{QGroup: &qGroupFalse}}, func() options {
result := globalSubscriptionOptions
result.natsQueueGroupName = ""
result.subscriptionType = subscriptionTypeTopic
return result
}, false},
{"test subscription options with qGroup true and global options qGroupName set", args{globalSubscriptionOptions, &messaging.SubscriptionOptions{QGroup: &qGroupTrue}}, func() options {
return globalSubscriptionOptions
result := globalSubscriptionOptions
result.subscriptionType = subscriptionTypeQueueGroup
return result
}, false},
{"test subscription options with qGroup true and global options qGroupName not set", args{options{natsQueueGroupName: ""}, &messaging.SubscriptionOptions{QGroup: &qGroupTrue}}, func() options {
return options{}
Expand Down

0 comments on commit e0f9bf3

Please sign in to comment.