Skip to content

Commit

Permalink
Bindings: Move queue declaration settings to the binding
Browse files Browse the repository at this point in the history
Options for the queue and consumer configuration on the bindings are now
moved to the Binding config struct. This enable the user to have
different options for different handlers. This aligns more with how AMQP
generally. We also default to using qourum-queues when creating
bindings.

The PrefetchCount configuration is also moved to the binding. This
allows for different prefetch counts for different bindings.

Exchanges are now declared as a seperate step due to the fact that
RabbitMQ comes with all the default exchanges ready to be used by
default. Those exchanges have now gotten constants. Most often, users
will not need to declare any exchanges at all.

The Mandatory flag is now set on the request instead of on the client as
a whole.

The Immediate flag was removed in RabbitMQ 3 and was never really
supported by this library.

ResponseWriter no longer has the option to set the Mandatory flag.
Mandatory would close the server connection if the client is no longer
running and the reply-to queue removed.

Connections are now named. Allowing them to be identified in the
management ui.

The FanoutBinding convenience function is removed. Fanout exchanges are
a bit weird and it is probably better to use a topic binding instead.
  • Loading branch information
akarl committed Nov 7, 2024
1 parent d8c223f commit a722da3
Show file tree
Hide file tree
Showing 22 changed files with 664 additions and 611 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CURL ?= curl
DOCKER_COMPOSE = docker-compose
GOLANGCI_VERSION = v1.58.1
GOLANGCI_VERSION = v1.61.0
GOPATH = $(shell go env GOPATH)

all: lint test ## Run linting and testing
Expand Down
76 changes: 22 additions & 54 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

## Description

This is a framework to use [RabbitMQ] as a client/server RPC setup togheter with
the [Go amqp] implementation. The framework can manage a fully funcitonal
This is a framework to use [RabbitMQ] as a client/server RPC setup together with
the [Go amqp] implementation. The framework can manage a fully functional
message queue setup with reconnects, disconnects, graceful shutdown and other
stability mechanisms. By providing this RabbitMQ can be used as transport and
service discovery to quickly get up and running with a micro service
Expand All @@ -35,7 +35,7 @@ nomenclature is unique for RabbitMQ some prior experience is preferred.

## Project status

This project has been used in production since october 2018 handling millions of
This project has been used in production since October 2018 handling millions of
requests both as server and client.

## Server
Expand All @@ -44,7 +44,7 @@ The server is inspired by the HTTP library where the user maps a [RabbitMQ
binding] to a handler function. A response writer is passed to the handler which
may be used as an `io.Writer` to write the response.

This is an an example of how to get up and running with a server responding to
This is an example of how to get up and running with a server responding to
all messages published to the given routing key.

```go
Expand All @@ -67,7 +67,6 @@ type.

```go
server.Bind(DirectBinding("routing_key", handleFunc))
server.Bind(FanoutBinding("fanout-exchange", handleFunc))
server.Bind(TopicBinding("queue-name", "routing_key.#", handleFunc))
server.Bind(HeadersBinding("queue-name", amqp.Table{"x-match": "all", "foo": "bar"}, handleFunc))
```
Expand All @@ -76,14 +75,9 @@ If the default variables doesn't result in the desired result you can setup the
binding with the type manually.

```go
customBinding := HandlerBinding{
QueueName: "oh-sweet-queue",
ExchangeName: "my-exchange",
ExchangeType: ExchangeDirect,
RoutingKey: "my-key",
BindHeaders: amqp.Table{},
Handler: handleFunc,
}
customBinding := CreateBinding("oh-sweet-queue", DefaultExchangeNameDirect, handleFunc).
WithPrefetchCount(100).
WithAutoAck(false)

server.Bind(customBinding)
```
Expand All @@ -94,18 +88,13 @@ can be changed by calling chainable methods.

```go
server := NewServer("amqp://guest:guest@localhost:5672").
WithConsumeSettings(ConsumeSettings{}).
WithQueueDeclareSettings(QueueDeclareSettings{}).
WithExchangeDeclareSettings(ExchangeDeclareSettings{}).
WithDebugLogger(log.Printf).
WithErrorLogger(log.Printf).
WithDialConfig(amqp.Config{}).
WithTLS(&tls.Config{})
```

QoS is by default set to a prefetch count of `10` and a prefetch size of `0` (no
limit). If you want to change this you can use the
`WithConsumeSettings(settings)` function.
QoS is by default set to a prefetch count of `10`. If you want to change this
you can modify the binding by setting the `PrefetchCount` to something else.

## Client

Expand Down Expand Up @@ -142,7 +131,7 @@ methods.
client := NewClient("amqp://guest:guest@localhost:5672").
WithTimeout(5000 * time.Milliseconds)

// Will not connect and may be changed untill this call.
// Will not connect and may be changed until this call.
client.Send(NewRequest().WithRoutingKey("routing_key"))
```

Expand All @@ -154,10 +143,8 @@ client := NewClient("amqp://guest:guest@localhost:5672").
WithErrorLogger(log.Printf).
WithDialConfig(amqp.Config{}).
WithTLS(&tls.Config{}).
WithQueueDeclareSettings(QueueDeclareSettings{}).
WithConsumeSettings(ConsumeSettings{}).
WithPublishSettings(PublishSettings{}).
WithConfirmMode(true),
WithReplyToConsumerArgs(amqp.Table{}).
WithConfirmMode(false),
WithTimeout(10 * Time.Second)
```

Expand All @@ -170,14 +157,12 @@ can read more here](https://www.rabbitmq.com/confirms.html#publisher-confirms)

The client is set in confirm mode by default.

You can use `WithPublishSettings` or `WithConfirmMode` to control this setting.
You can use `WithConfirmMode` to control this setting. It defaults to `true`.

```go
client := NewClient("amqp://guest:guest@localhost:5672").
WithConfirmMode(true)

client := NewClient("amqp://guest:guest@localhost:5672").
WithPublishSettings(true)
```

### Request
Expand All @@ -195,8 +180,8 @@ request := NewRequest().
WithExchange("custom.exchange").
WithRoutingKey("routing_key").
WithHeaders(amqp.Headers{}).
WithCorrelationID("custom-correlation-id").
WithTimeout(5 * time.Second).
WithMandatory(true).
WithResponse(true)
```

Expand All @@ -219,10 +204,6 @@ if err != nil {
}
```

**Note**: If you request a response when sending to a fanout exchange the
response will be the first one responded from any of the subscribers. There's
currently no way to stream multiple responses for the same request.

### Sender

The client invokes a default `SendFunc` while calling `Send()` where all the
Expand Down Expand Up @@ -352,7 +333,7 @@ func myMiddleware(next SendFunc) SendFunc {
client := NewClient("amqp://guest:guest@localhost:5672").
AddMiddleware(myMiddleware)

// Add the middleware to a singlerequest
// Add the middleware to a single request
reuqest := NewRequest().
WithRoutingKey("routing_key").
AddMiddleware(myMiddleware)
Expand All @@ -371,38 +352,25 @@ For more examples of client middlewares, see [examples/middleware].
You often want to know when a connection has been established and when it comes
to RabbitMQ also perform some post connection setup. This is enabled by the fact
that both the server and the client holds a list of `OnStarted`. The function
receives the incomming connection, outgoing connection, incomming channel and
receives the incoming connection, outgoing connection, incoming channel and
outgoing channel.

```go
type OnStartedFunc func(inputConn, outputConn *amqp.Connection, inputChannel, outputChannel *amqp.Channel)
```

As an example this is a great place to do some initial QoS setup.

```go
server := NewServer("amqp://guest:guest@localhost:5672")

setupQoS(_, _ *amqp.Connection, inChan, _ *amqp.Channel) {
err := inChan.Qos(
10, // Prefetch count
1024, // Prefetch size
true, // Global
)

if err != nil {
panic(err.Error())
}
server := NewServer("amqp://guest:guest@localhost:5672").
OnStarted(func(inConn, outConn *amqp.Connection, inChan, outChan *amqp.Channel) {
// Do something after connection here...
})
}

// Setup QoS when the connection is established.
server.OnStarted(setupQoS)

server.ListenAndServe()
```

Both the server and the client follow the recommendations for [RabbitMQ
connections] which means separate connections for incomming and outgoing traffic
connections] which means separate connections for incoming and outgoing traffic
and separate channels for consuming and publishing messages. Because of this the
signature looks the same way for both the server and the client.

Expand Down Expand Up @@ -431,7 +399,7 @@ server.ListenAndServe()

## Logging

You can specifiy two optional loggers for debugging and errors or unexpected
You can specify two optional loggers for debugging and errors or unexpected
behaviour. By default only error logging is turned on and is logged via the log
package's standard logging.

Expand Down
6 changes: 1 addition & 5 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,7 @@ func Benchmark(b *testing.B) {
fastClient := NewClient(testURL).
WithErrorLogger(log.Printf).
WithTimeout(3 * time.Minute).
WithPublishSettings(PublishSettings{
Mandatory: true,
Immediate: false,
ConfirmMode: false,
})
WithConfirmMode(false)

defer fastClient.Stop()

Expand Down
Loading

0 comments on commit a722da3

Please sign in to comment.