forked from wagslane/go-rabbitmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bindings_options.go
139 lines (120 loc) · 5.04 KB
/
bindings_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package rabbitmq
// BindingExchangeOptions are used when binding to an exchange.
// it will verify the exchange is created before binding to it.
type BindingExchangeOptions struct {
DoBinding bool
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
BindingNoWait bool
BindingArgs Table
ExchangeArgs Table
}
// QueueDeclareOptions arguments to declare a queue
type QueueDeclareOptions struct {
DoDeclare bool
QueueName string
QueueDurable bool
QueueAutoDelete bool
QueueExclusive bool
QueueNoWait bool
QueueArgs Table
}
// QosOptions configuration
type QosOptions struct {
QOSPrefetchCount int
QOSPrefetchSize int
QOSGlobal bool
}
// WithQueueDeclare define if queue will be declare or not
func WithQueueDeclare(options *QueueDeclareOptions) {
options.DoDeclare = true
}
// WithQueueDeclareOptionsDurable sets the queue to durable, which means it won't
// be destroyed when the server restarts. It must only be bound to durable exchanges
func WithQueueDeclareOptionsDurable(options *QueueDeclareOptions) {
options.QueueDurable = true
}
// WithQueueDeclareOptionsAutoDelete sets the queue to auto delete, which means it will
// be deleted when there are no more consumers on it
func WithQueueDeclareOptionsAutoDelete(options *QueueDeclareOptions) {
options.QueueAutoDelete = true
}
// WithQueueDeclareOptionsExclusive sets the queue to exclusive, which means
// it's are only accessible by the connection that declares it and
// will be deleted when the connection closes. Channels on other connections
// will receive an error when attempting to declare, bind, consume, purge or
// delete a queue with the same name.
func WithQueueDeclareOptionsExclusive(options *QueueDeclareOptions) {
options.QueueExclusive = true
}
// WithQueueDeclareOptionsNoWait sets the queue to nowait, which means
// the queue will assume to be declared on the server. A
// channel exception will arrive if the conditions are met for existing queues
// or attempting to modify an existing queue from a different connection.
func WithQueueDeclareOptionsNoWait(options *QueueDeclareOptions) {
options.QueueNoWait = true
}
// WithQueueDeclareOptionsQuorum sets the queue a quorum type, which means multiple nodes
// in the cluster will have the messages distributed amongst them for higher reliability
func WithQueueDeclareOptionsQuorum(options *QueueDeclareOptions) {
if options.QueueArgs == nil {
options.QueueArgs = Table{}
}
options.QueueArgs["x-queue-type"] = "quorum"
}
// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values.
func getBindingExchangeOptionsOrSetDefault(options *BindingExchangeOptions) *BindingExchangeOptions {
if options == nil {
options = &BindingExchangeOptions{
Name: "",
Kind: "direct",
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
ExchangeArgs: nil,
}
}
return options
}
// WithBindingExchangeOptionsExchangeName returns a function that sets the exchange name the queue will be bound to
func WithBindingExchangeOptionsExchangeName(name string, options *BindingExchangeOptions) {
options.Name = name
}
// WithBindingExchangeOptionsExchangeKind returns a function that sets the binding exchange kind/type
func WithBindingExchangeOptionsExchangeKind(kind string, options *BindingExchangeOptions) {
options.Kind = kind
}
// WithBindingExchangeOptionsExchangeDurable returns a function that sets the binding exchange durable flag
func WithBindingExchangeOptionsExchangeDurable(options *BindingExchangeOptions) {
options.Durable = true
}
// WithBindingExchangeOptionsExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag
func WithBindingExchangeOptionsExchangeAutoDelete(options *BindingExchangeOptions) {
options.AutoDelete = true
}
// WithBindingExchangeOptionsExchangeInternal returns a function that sets the binding exchange internal flag
func WithBindingExchangeOptionsExchangeInternal(options *BindingExchangeOptions) {
options.Internal = true
}
// WithBindingExchangeOptionsExchangeNoWait returns a function that sets the binding exchange noWait flag
func WithBindingExchangeOptionsExchangeNoWait(options *BindingExchangeOptions) {
options.NoWait = true
}
// WithBindingExchangeOptionsExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange
func WithBindingExchangeOptionsExchangeArgs(args Table, options *BindingExchangeOptions) {
options.ExchangeArgs = args
}
// WithBindingExchangeOptionsNoWait sets the bindings to nowait, which means if the queue can not be bound
// the channel will not be closed with an error.
func WithBindingExchangeOptionsNoWait(options *BindingExchangeOptions) {
options.BindingNoWait = true
}
// WithBindingExchange define if the exchange is to be binded or not
func WithBindingExchange(options *BindingExchangeOptions) {
options.DoBinding = true
}