Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract const for exchange types #29

Merged
merged 1 commit into from
Apr 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions integration/rabbus_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func testRabbusPublishSubscribe(t *testing.T) {

messages, err := r.Listen(rabbus.ListenConfig{
Exchange: "test_ex",
Kind: "direct",
Kind: rabbus.ExchangeDirect,
Key: "test_key",
Queue: "test_q",
})
Expand All @@ -101,7 +101,7 @@ func testRabbusPublishSubscribe(t *testing.T) {

msg := rabbus.Message{
Exchange: "test_ex",
Kind: "direct",
Kind: rabbus.ExchangeDirect,
Key: "test_key",
Payload: []byte(`foo`),
DeliveryMode: rabbus.Persistent,
Expand Down Expand Up @@ -167,7 +167,7 @@ func benchmarkEmitAsync(b *testing.B) {
for n := 0; n < b.N; n++ {
msg := rabbus.Message{
Exchange: "test_bench_ex" + strconv.Itoa(n%10),
Kind: "direct",
Kind: rabbus.ExchangeDirect,
Key: "test_key",
Payload: []byte(`foo`),
DeliveryMode: rabbus.Persistent,
Expand Down
14 changes: 8 additions & 6 deletions internal/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package amqp

import "github.com/streadway/amqp"

// Amqp interpret (implement) Amqp interface definition
type Amqp struct {
conn *amqp.Connection
ch *amqp.Channel
passiveExchange bool
}
type (
// Amqp interpret (implement) Amqp interface definition
Amqp struct {
conn *amqp.Connection
ch *amqp.Channel
passiveExchange bool
}
)

// New returns a new Amqp configured, or returning an non-nil err
// if an error occurred while creating connection or channel.
Expand Down
10 changes: 8 additions & 2 deletions rabbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@ const (
Transient uint8 = 1
// Persistent messages will be restored to durable queues and lost on non-durable queues during server restart.
Persistent uint8 = 2
// ContentTypeJSON define json content type
// ContentTypeJSON define json content type.
ContentTypeJSON = "application/json"
// ContentTypePlain define plain text content type
// ContentTypePlain define plain text content type.
ContentTypePlain = "plain/text"
// ExchangeDirect indicates the exchange is of direct type.
ExchangeDirect = "direct"
// ExchangeFanout indicates the exchange is of fanout type.
ExchangeFanout = "fanout"
// ExchangeTopic indicates the exchange is of topic type.
ExchangeTopic = "topic"

contentEncoding = "UTF-8"
)
Expand Down
12 changes: 6 additions & 6 deletions rabbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func testCreateNewListener(t *testing.T) {
durable := true
config := ListenConfig{
Exchange: "exchange",
Kind: "direct",
Kind: ExchangeDirect,
Key: "key",
Queue: "queue",
}
Expand Down Expand Up @@ -210,7 +210,7 @@ func testFailToCreateNewListenerWhenCreateConsumerReturnsError(t *testing.T) {

_, err = r.Listen(ListenConfig{
Exchange: "exchange",
Kind: "direct",
Kind: ExchangeDirect,
Key: "key",
Queue: "queue",
})
Expand All @@ -222,7 +222,7 @@ func testFailToCreateNewListenerWhenCreateConsumerReturnsError(t *testing.T) {
func testEmitAsyncMessage(t *testing.T) {
msg := Message{
Exchange: "exchange",
Kind: "direct",
Kind: ExchangeDirect,
Key: "key",
Payload: []byte(`foo`),
}
Expand Down Expand Up @@ -291,7 +291,7 @@ outer:
func testEmitAsyncMessageFailToDeclareExchange(t *testing.T) {
msg := Message{
Exchange: "exchange",
Kind: "direct",
Kind: ExchangeDirect,
Key: "key",
Payload: []byte(`foo`),
}
Expand Down Expand Up @@ -337,7 +337,7 @@ outer:
func testEmitAsyncMessageFailToPublish(t *testing.T) {
msg := Message{
Exchange: "exchange",
Kind: "direct",
Kind: ExchangeDirect,
Key: "key",
Payload: []byte(`foo`),
}
Expand Down Expand Up @@ -384,7 +384,7 @@ func testEmitAsyncMessageEnsureBreaker(t *testing.T) {
threshold := uint32(1)
msg := Message{
Exchange: "exchange",
Kind: "direct",
Kind: ExchangeDirect,
Key: "key",
Payload: []byte(`foo`),
}
Expand Down