-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrabbitmqmgmt.go
230 lines (203 loc) · 5.88 KB
/
rabbitmqmgmt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package main
import (
"fmt"
"github.com/codegangsta/cli"
"github.com/streadway/amqp"
"log"
"os"
)
const (
VERSION = "0.0.1"
ERROR = -1
)
func validateArgsNumber(c *cli.Context, argsNumber int, msg string) {
argc := len(c.Args())
if argc != argsNumber {
fmt.Println(msg)
os.Exit(ERROR)
}
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func queue_create(amqp_uri string, queue_name string, durable bool, auto_delete bool, messageTtl int32) {
conn, err := amqp.Dial(amqp_uri)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
args := amqp.Table{}
if messageTtl > 0 {
args["x-message-ttl"] = messageTtl
}
_, err = ch.QueueDeclare(
queue_name,
durable,
auto_delete,
false,
false,
args,
)
failOnError(err, "Failed to declare a queue")
}
func queue_remove(amqp_uri string, queue_name string) {
conn, err := amqp.Dial(amqp_uri)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
_, err = ch.QueueDelete(
queue_name,
false, // ifUnused
false, // ifEmpty
false) // noWait
failOnError(err, "Failed to remove a queue")
}
func queue_bind(amqp_uri string, queue_name string, exchange_name string, routing_key string) {
conn, err := amqp.Dial(amqp_uri)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.QueueBind(
queue_name,
routing_key,
exchange_name,
false, // noWait
nil) // args
failOnError(err, "Failed to bind the queue to the exchange")
}
func queue_unbind(amqp_uri string, queue_name string, exchange_name string, routing_key string) {
conn, err := amqp.Dial(amqp_uri)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.QueueUnbind(
queue_name,
routing_key,
exchange_name,
nil) // args
failOnError(err, "Failed to bind the queue to the exchange")
}
func exchange_create(amqp_uri string, queue_name string, exchange_type string, durable bool, auto_delete bool) {
conn, err := amqp.Dial(amqp_uri)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
queue_name,
exchange_type,
durable,
auto_delete,
false, // internal
false, // noWait
nil, // args
)
failOnError(err, "Failed to declare a exchange")
}
func exchange_remove(amqp_uri string, exchange_name string) {
conn, err := amqp.Dial(amqp_uri)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDelete(
exchange_name,
false, // ifUnused
false) // noWait
failOnError(err, "Failed to remove exchange")
}
func main() {
app := cli.NewApp()
app.Name = "rabbitmqmgmt"
app.Usage = "rabbitmq queue/exchage/bindings management"
app.Author = "Eduardo Ferro Aldama"
app.Email = "eduardo.ferro.aldama@gmail.com"
app.Version = VERSION
app.Flags = []cli.Flag{
cli.StringFlag{"amqp_uri, u", "amqp://guest:guest@localhost:5672/", "broker url (including vhost)"},
}
app.Commands = []cli.Command{
{
Name: "queue_add",
Usage: "add a new queue",
Flags: []cli.Flag{
cli.BoolFlag{"durable", "queue survive broker restart"},
cli.BoolFlag{"auto-delete", "queue is deleted when last consumer unsubscribes"},
cli.IntFlag{"x-message-ttl", 0, "per-queue message TTL (ms) (0 for no ttl)"},
},
Action: func(c *cli.Context) {
validateArgsNumber(c, 1, "Usage: queue_add queue_name")
messageTtl := int32(c.Int("x-message-ttl"))
queue_create(c.GlobalString("amqp_uri"), c.Args().First(), c.Bool("durable"), c.Bool("auto-delete"), messageTtl)
},
},
{
Name: "queue_remove",
Usage: "remove an existing queue",
Action: func(c *cli.Context) {
validateArgsNumber(c, 1, "Usage: queue_remove queue_name")
queue_remove(c.GlobalString("amqp_uri"), c.Args().First())
},
},
{
Name: "queue_bind",
Usage: "bind a queue to a exchange using a ginven topic/routing key",
Action: func(c *cli.Context) {
validateArgsNumber(c, 3, "Usage: queue_bind queue exchange routing_key")
queue_bind(
c.GlobalString("amqp_uri"),
c.Args().Get(0), // queue_name
c.Args().Get(1), // exchange_name
c.Args().Get(2), // routing_key/topic
)
},
},
{
Name: "queue_unbind",
Usage: "remove an existing binding",
Action: func(c *cli.Context) {
validateArgsNumber(c, 3, "Usage: queue_unbind queue exchange routing_key")
queue_unbind(
c.GlobalString("amqp_uri"),
c.Args().Get(0), // queue_name
c.Args().Get(1), // exchange_name
c.Args().Get(2), // routing_key/topic
)
},
},
{
Name: "exchange_add",
Usage: "add a new exchange",
Flags: []cli.Flag{
cli.StringFlag{"type", "direct", "exchange type (direct|fanout|topic|Header)"},
cli.BoolFlag{"durable", "exchanges survive broker restart"},
cli.BoolFlag{"auto-delete", "exchange is deleted when all queues have finished using it"},
},
Action: func(c *cli.Context) {
validateArgsNumber(c, 1, "Usage: exchange_add exchange_name")
exchange_create(c.GlobalString("amqp_uri"), c.Args().First(), c.String("type"), c.Bool("durable"), c.Bool("auto-delete"))
},
},
{
Name: "exchange_remove",
Usage: "remove an existing exchange",
Action: func(c *cli.Context) {
validateArgsNumber(c, 1, "Usage: exchange_remove exchange_name")
exchange_remove(c.GlobalString("amqp_uri"), c.Args().First())
},
},
}
app.Run(os.Args)
}