diff --git a/integration/rabbus_integration_test.go b/integration/rabbus_integration_test.go index 3dd6c97..9d1430a 100644 --- a/integration/rabbus_integration_test.go +++ b/integration/rabbus_integration_test.go @@ -80,7 +80,7 @@ func testRabbusPublishSubscribe(t *testing.T) { messages, err := r.Listen(rabbus.ListenConfig{ Exchange: "test_ex", - Kind: "direct", + Kind: rabbus.ExchangeDirect, Key: "test_key", Queue: "test_q", }) @@ -101,7 +101,7 @@ func testRabbusPublishSubscribe(t *testing.T) { msg := rabbus.Message{ Exchange: "test_ex", - Kind: "direct", + Kind: rabbus.ExchangeDirect, Key: "test_key", Payload: []byte(`foo`), DeliveryMode: rabbus.Persistent, @@ -167,7 +167,7 @@ func benchmarkEmitAsync(b *testing.B) { for n := 0; n < b.N; n++ { msg := rabbus.Message{ Exchange: "test_bench_ex" + strconv.Itoa(n%10), - Kind: "direct", + Kind: rabbus.ExchangeDirect, Key: "test_key", Payload: []byte(`foo`), DeliveryMode: rabbus.Persistent, diff --git a/internal/amqp/amqp.go b/internal/amqp/amqp.go index d2e45cb..445c7f9 100644 --- a/internal/amqp/amqp.go +++ b/internal/amqp/amqp.go @@ -2,12 +2,14 @@ package amqp import "github.com/streadway/amqp" -// Amqp interpret (implement) Amqp interface definition -type Amqp struct { - conn *amqp.Connection - ch *amqp.Channel - passiveExchange bool -} +type ( + // Amqp interpret (implement) Amqp interface definition + Amqp struct { + conn *amqp.Connection + ch *amqp.Channel + passiveExchange bool + } +) // New returns a new Amqp configured, or returning an non-nil err // if an error occurred while creating connection or channel. diff --git a/rabbus.go b/rabbus.go index 1145da2..a3fe8a7 100644 --- a/rabbus.go +++ b/rabbus.go @@ -17,10 +17,16 @@ const ( Transient uint8 = 1 // Persistent messages will be restored to durable queues and lost on non-durable queues during server restart. Persistent uint8 = 2 - // ContentTypeJSON define json content type + // ContentTypeJSON define json content type. ContentTypeJSON = "application/json" - // ContentTypePlain define plain text content type + // ContentTypePlain define plain text content type. ContentTypePlain = "plain/text" + // ExchangeDirect indicates the exchange is of direct type. + ExchangeDirect = "direct" + // ExchangeFanout indicates the exchange is of fanout type. + ExchangeFanout = "fanout" + // ExchangeTopic indicates the exchange is of topic type. + ExchangeTopic = "topic" contentEncoding = "UTF-8" ) diff --git a/rabbus_test.go b/rabbus_test.go index c7edebe..9510ef9 100644 --- a/rabbus_test.go +++ b/rabbus_test.go @@ -153,7 +153,7 @@ func testCreateNewListener(t *testing.T) { durable := true config := ListenConfig{ Exchange: "exchange", - Kind: "direct", + Kind: ExchangeDirect, Key: "key", Queue: "queue", } @@ -210,7 +210,7 @@ func testFailToCreateNewListenerWhenCreateConsumerReturnsError(t *testing.T) { _, err = r.Listen(ListenConfig{ Exchange: "exchange", - Kind: "direct", + Kind: ExchangeDirect, Key: "key", Queue: "queue", }) @@ -222,7 +222,7 @@ func testFailToCreateNewListenerWhenCreateConsumerReturnsError(t *testing.T) { func testEmitAsyncMessage(t *testing.T) { msg := Message{ Exchange: "exchange", - Kind: "direct", + Kind: ExchangeDirect, Key: "key", Payload: []byte(`foo`), } @@ -291,7 +291,7 @@ outer: func testEmitAsyncMessageFailToDeclareExchange(t *testing.T) { msg := Message{ Exchange: "exchange", - Kind: "direct", + Kind: ExchangeDirect, Key: "key", Payload: []byte(`foo`), } @@ -337,7 +337,7 @@ outer: func testEmitAsyncMessageFailToPublish(t *testing.T) { msg := Message{ Exchange: "exchange", - Kind: "direct", + Kind: ExchangeDirect, Key: "key", Payload: []byte(`foo`), } @@ -384,7 +384,7 @@ func testEmitAsyncMessageEnsureBreaker(t *testing.T) { threshold := uint32(1) msg := Message{ Exchange: "exchange", - Kind: "direct", + Kind: ExchangeDirect, Key: "key", Payload: []byte(`foo`), }